KAFKA-3771; Improving Kafka core code

- Used flatMap instead of map and flatten
- Use isEmpty, NonEmpty, isDefined as appropriate
- Used head, keys and keySet where appropriate
- Used contains, diff and find where appropriate
- Removed redundant val modifier for case class constructor
- toString has no parameters, no side effect hence without () consistent usage
- Removed unnecessary return , parentheses and semi colons.

Author: Joshi <[email protected]>
Author: Rekha Joshi <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #1451 from rekhajoshm/KAFKA-3771


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79aaf19f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79aaf19f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79aaf19f

Branch: refs/heads/trunk
Commit: 79aaf19f24bb48f90404a3e3896d115107991f4c
Parents: 2c7fae0
Author: Rekha Joshi <[email protected]>
Authored: Mon Jun 6 08:08:06 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Mon Jun 6 08:08:47 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   |   2 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |   2 +-
 .../kafka/api/ControlledShutdownRequest.scala   |   2 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../kafka/api/GenericRequestAndHeader.scala     |   2 +-
 .../kafka/api/GenericResponseAndHeader.scala    |   2 +-
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |   4 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   2 +-
 .../main/scala/kafka/api/OffsetResponse.scala   |   2 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   2 +-
 .../scala/kafka/api/RequestOrResponse.scala     |   2 +-
 .../main/scala/kafka/api/TopicMetadata.scala    |   4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   2 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   4 +-
 core/src/main/scala/kafka/cluster/Cluster.scala |   2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  14 +-
 core/src/main/scala/kafka/cluster/Replica.scala |   4 +-
 core/src/main/scala/kafka/common/AppInfo.scala  |   2 +-
 .../ZkNodeChangeNotificationListener.scala      |   5 +-
 .../main/scala/kafka/consumer/KafkaStream.scala |   2 +-
 .../kafka/consumer/PartitionAssignor.scala      |   2 +-
 .../kafka/consumer/PartitionTopicInfo.scala     |   2 +-
 .../controller/ControllerChannelManager.scala   |  12 +-
 .../kafka/controller/KafkaController.scala      |  70 ++++----
 .../controller/PartitionLeaderSelector.scala    |   2 +-
 .../controller/PartitionStateMachine.scala      |  12 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   6 +-
 .../kafka/controller/TopicDeletionManager.scala |  16 +-
 .../kafka/coordinator/GroupCoordinator.scala    |   2 +-
 .../coordinator/GroupMetadataManager.scala      |   2 +-
 .../kafka/coordinator/MemberMetadata.scala      |   2 +-
 .../kafka/javaapi/TopicMetadataRequest.scala    |   2 +-
 core/src/main/scala/kafka/log/Log.scala         |   2 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 core/src/main/scala/kafka/message/Message.scala |   2 +-
 .../kafka/metrics/KafkaMetricsReporter.scala    |   8 +-
 .../kafka/network/RequestOrResponseSend.scala   |   2 +-
 .../kafka/producer/BrokerPartitionInfo.scala    |   2 +-
 .../kafka/producer/ProducerRequestStats.scala   |   2 +-
 .../producer/async/DefaultEventHandler.scala    |  14 +-
 .../security/auth/SimpleAclAuthorizer.scala     |   6 +-
 .../scala/kafka/server/ClientQuotaManager.scala |   2 +-
 .../scala/kafka/server/DelayedOperation.scala   |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |   2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  13 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |   4 +-
 .../scala/kafka/tools/ProducerPerformance.scala |   2 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |   2 +-
 .../kafka/tools/SimpleConsumerPerformance.scala |   4 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |  10 +-
 .../kafka/tools/StateChangeLogMerger.scala      |   8 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   |   2 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |   4 +-
 .../src/main/scala/kafka/utils/ToolsUtils.scala |   2 +-
 .../kafka/utils/VerifiableProperties.scala      |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   8 +-
 .../integration/kafka/api/AdminClientTest.scala |  22 +--
 .../kafka/api/AuthorizerIntegrationTest.scala   |  10 +-
 .../kafka/api/BaseConsumerTest.scala            |  52 +++---
 .../kafka/api/BaseProducerSendTest.scala        |  14 +-
 .../kafka/api/ConsumerBounceTest.scala          |   6 +-
 .../kafka/api/IntegrationTestHarness.scala      |   2 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 164 +++++++++----------
 .../api/SaslMultiMechanismConsumerTest.scala    |   4 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |   4 +-
 .../scala/other/kafka/TestOffsetManager.scala   |   2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |   2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  14 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |   2 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  18 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   4 +-
 .../ZookeeperConsumerConnectorTest.scala        |   2 +-
 .../controller/ControllerFailoverTest.scala     |   6 +-
 .../GroupCoordinatorResponseTest.scala          |   4 +-
 .../kafka/integration/PrimitiveApiTest.scala    |   6 +-
 .../ProducerConsumerTestHarness.scala           |   2 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |   4 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   4 +-
 .../message/ByteBufferMessageSetTest.scala      |   2 +-
 .../scala/unit/kafka/message/MessageTest.scala  |  12 +-
 .../unit/kafka/network/SocketServerTest.scala   |   2 +-
 .../unit/kafka/producer/ProducerTest.scala      |  10 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  10 +-
 .../security/auth/ZkAuthorizationTest.scala     |  10 +-
 .../kafka/server/BaseReplicaFetchTest.scala     |   2 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  12 +-
 .../server/HighwatermarkPersistenceTest.scala   |  12 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   3 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   6 +-
 .../server/ServerGenerateBrokerIdTest.scala     |   4 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  10 +-
 .../unit/kafka/tools/ConsoleProducerTest.scala  |   4 +-
 .../unit/kafka/utils/IteratorTemplateTest.scala |   2 +-
 .../scala/unit/kafka/utils/JaasTestUtils.scala  |   2 +-
 .../scala/unit/kafka/utils/MockScheduler.scala  |   4 +-
 .../test/scala/unit/kafka/utils/MockTime.scala  |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  18 +-
 104 files changed, 401 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index c643a9d..39bfe62 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -216,7 +216,7 @@ object TopicCommand extends Logging {
               val leader = zkUtils.getLeaderForPartition(topic, partitionId)
               if ((!reportUnderReplicatedPartitions && 
!reportUnavailablePartitions) ||
                   (reportUnderReplicatedPartitions && inSyncReplicas.size < 
assignedReplicas.size) ||
-                  (reportUnavailablePartitions && (!leader.isDefined || 
!liveBrokers.contains(leader.get)))) {
+                  (reportUnavailablePartitions && (leader.isEmpty || 
!liveBrokers.contains(leader.get)))) {
                 print("\tTopic: " + topic)
                 print("\tPartition: " + partitionId)
                 print("\tLeader: " + (if(leader.isDefined) leader.get else 
"none"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 2417d79..666d0e7 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -72,7 +72,7 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
   override def compare(that: ApiVersion): Int =
     ApiVersion.orderingByVersion.compare(this, that)
 
-  override def toString(): String = version
+  override def toString: String = version
 }
 
 // Keep the IDs in order of versions

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 42a17e6..52c8828 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -63,7 +63,7 @@ case class ControlledShutdownRequest(versionId: Short,
       4 /* broker id */
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala 
b/core/src/main/scala/kafka/api/FetchRequest.scala
index 83e139a..f74bd1c 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -141,7 +141,7 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
 
   def numPartitions = requestInfo.size
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala 
b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
index b0c6d7a..cb5b95e 100644
--- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -39,7 +39,7 @@ private[kafka] abstract class GenericRequestAndHeader(val 
versionId: Short,
     body.sizeOf()
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala 
b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
index 748b5e9..2835fb6 100644
--- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -32,7 +32,7 @@ private[kafka] abstract class GenericResponseAndHeader(val 
correlationId: Int,
     body.sizeOf()
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala 
b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 5de527c..e5813a5 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -35,7 +35,7 @@ object LeaderAndIsr {
 case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: 
List[Int], var zkVersion: Int) {
   def this(leader: Int, isr: List[Int]) = this(leader, 
LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
 
-  override def toString(): String = {
+  override def toString: String = {
     Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" 
-> isr))
   }
 }
@@ -83,7 +83,7 @@ case class PartitionStateInfo(leaderIsrAndControllerEpoch: 
LeaderIsrAndControlle
     size
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + 
leaderIsrAndControllerEpoch.toString)
     partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala 
b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 59181d1..b15cf5a 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -109,7 +109,7 @@ case class OffsetRequest(requestInfo: 
Map[TopicAndPartition, PartitionOffsetRequ
   def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
   def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala 
b/core/src/main/scala/kafka/api/OffsetResponse.scala
index bfb270f..b767c08 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -46,7 +46,7 @@ object OffsetResponse {
 
 
 case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
-  override def toString(): String = {
+  override def toString: String = {
     new String("error: " + Errors.forCode(error).exceptionName + " offsets: " 
+ offsets.mkString)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala 
b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 30af841..aad2fa5 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -124,7 +124,7 @@ case class ProducerRequest(versionId: Short = 
ProducerRequest.CurrentVersion,
 
   def numPartitions = data.size
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala 
b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 73ec1d9..65b37fd 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -26,7 +26,7 @@ object Request {
   val DebuggingConsumerId: Int = -2
 
   // Broker ids are non-negative int.
-  def isValidBrokerId(brokerId: Int): Boolean = (brokerId >= 0)
+  def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala 
b/core/src/main/scala/kafka/api/TopicMetadata.scala
index ae5ea58..815de21 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -57,7 +57,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: 
Seq[PartitionMetadat
     partitionsMetadata.foreach(m => m.writeTo(buffer))
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     val topicMetadataInfo = new StringBuilder
     topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
     Errors.forCode(errorCode) match {
@@ -138,7 +138,7 @@ case class PartitionMetadata(partitionId: Int,
     isr.foreach(r => buffer.putInt(r.id))
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     val partitionMetadataString = new StringBuilder
     partitionMetadataString.append("\tpartition " + partitionId)
     partitionMetadataString.append("\tleader: " + (if(leader.isDefined) 
leader.get.toString else "none"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 0654e3d..107696d 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -57,7 +57,7 @@ case class TopicMetadataRequest(versionId: Short,
     topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala 
b/core/src/main/scala/kafka/client/ClientUtils.scala
index fd1fc26..f61a978 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -142,11 +142,11 @@ object ClientUtils extends Logging{
 
      var offsetManagerChannelOpt: Option[BlockingChannel] = None
 
-     while (!offsetManagerChannelOpt.isDefined) {
+     while (offsetManagerChannelOpt.isEmpty) {
 
        var coordinatorOpt: Option[BrokerEndPoint] = None
 
-       while (!coordinatorOpt.isDefined) {
+       while (coordinatorOpt.isEmpty) {
          try {
            if (!queryChannel.isConnected)
              queryChannel = channelToAnyBroker(zkUtils)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/cluster/Cluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Cluster.scala 
b/core/src/main/scala/kafka/cluster/Cluster.scala
index 992c54e..75bbec0 100644
--- a/core/src/main/scala/kafka/cluster/Cluster.scala
+++ b/core/src/main/scala/kafka/cluster/Cluster.scala
@@ -40,6 +40,6 @@ private[kafka] class Cluster {
   
   def size = brokers.size
   
-  override def toString(): String = 
+  override def toString: String =
     "Cluster(" + brokers.values.mkString(", ") + ")"  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index ea22e87..a561a97 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -64,7 +64,7 @@ class Partition(val topic: String,
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, 
partitionId, localBrokerId)
 
-  private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == 
localBrokerId)
+  private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == 
localBrokerId
   val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
 
   newGauge("UnderReplicated",
@@ -158,7 +158,7 @@ class Partition(val topic: String,
   }
 
   def getLeaderEpoch(): Int = {
-    return this.leaderEpoch
+    this.leaderEpoch
   }
 
   /**
@@ -381,9 +381,9 @@ class Partition(val topic: String,
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, 
replicaMaxLagTimeMs)
-          if(outOfSyncReplicas.size > 0) {
+          if(outOfSyncReplicas.nonEmpty) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
-            assert(newInSyncReplicas.size > 0)
+            assert(newInSyncReplicas.nonEmpty)
             info("Shrinking ISR for partition [%s,%d] from %s to 
%s".format(topic, partitionId,
               inSyncReplicas.map(_.brokerId).mkString(","), 
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
@@ -421,7 +421,7 @@ class Partition(val topic: String,
     val candidateReplicas = inSyncReplicas - leaderReplica
 
     val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - 
r.lastCaughtUpTimeMs) > maxLagMs)
-    if(laggingReplicas.size > 0)
+    if(laggingReplicas.nonEmpty)
       debug("Lagging replicas for partition %s are 
%s".format(TopicAndPartition(topic, partitionId), 
laggingReplicas.map(_.brokerId).mkString(",")))
 
     laggingReplicas
@@ -484,7 +484,7 @@ class Partition(val topic: String,
   }
 
   override def equals(that: Any): Boolean = {
-    if(!(that.isInstanceOf[Partition]))
+    if(!that.isInstanceOf[Partition])
       return false
     val other = that.asInstanceOf[Partition]
     if(topic.equals(other.topic) && partitionId == other.partitionId)
@@ -496,7 +496,7 @@ class Partition(val topic: String,
     31 + topic.hashCode() + 17*partitionId
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     val partitionString = new StringBuilder
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 740e835..dfb203a 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -98,7 +98,7 @@ class Replica(val brokerId: Int,
   }
 
   override def equals(that: Any): Boolean = {
-    if(!(that.isInstanceOf[Replica]))
+    if(!that.isInstanceOf[Replica])
       return false
     val other = that.asInstanceOf[Replica]
     if(topic.equals(other.topic) && brokerId == other.brokerId && 
partition.equals(other.partition))
@@ -111,7 +111,7 @@ class Replica(val brokerId: Int,
   }
 
 
-  override def toString(): String = {
+  override def toString: String = {
     val replicaString = new StringBuilder
     replicaString.append("ReplicaId: " + brokerId)
     replicaString.append("; Topic: " + topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/common/AppInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AppInfo.scala 
b/core/src/main/scala/kafka/common/AppInfo.scala
index 8e2f49d..f77bdf5 100644
--- a/core/src/main/scala/kafka/common/AppInfo.scala
+++ b/core/src/main/scala/kafka/common/AppInfo.scala
@@ -42,7 +42,7 @@ object AppInfo extends KafkaMetricsGroup {
     newGauge("CommitID",
       new Gauge[String] {
         def value = {
-          AppInfoParser.getCommitId();
+          AppInfoParser.getCommitId()
         }
       })
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala 
b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index baddecc..580ae33 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -92,7 +92,9 @@ class ZkNodeChangeNotificationListener(private val zkUtils: 
ZkUtils,
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
             val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
-            data map (notificationHandler.processNotification(_)) getOrElse 
(logger.warn(s"read null data from $changeZnode when processing notification 
$notification"))
+            data.map(notificationHandler.processNotification(_)).getOrElse {
+              logger.warn(s"read null data from $changeZnode when processing 
notification $notification")
+            }
           }
           lastExecutedChange = changeId
         }
@@ -107,6 +109,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: 
ZkUtils,
 
   /**
    * Purges expired notifications.
+   *
    * @param now
    * @param notifications
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala 
b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 805e916..aebf3ea 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -45,7 +45,7 @@ class KafkaStream[K,V](private val queue: 
BlockingQueue[FetchedDataChunk],
     iter.clearCurrentChunk()
   }
 
-  override def toString(): String = {
+  override def toString: String = {
      "%s kafka stream".format(clientId)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala 
b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 5a1bdd0..96fe690 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -76,7 +76,7 @@ class RoundRobinAssignor() extends PartitionAssignor with 
Logging {
     val partitionAssignment =
       new Pool[String, mutable.Map[TopicAndPartition, 
ConsumerThreadId]](Some(valueFactory))
 
-    if (ctx.consumersForTopic.size > 0) {
+    if (ctx.consumersForTopic.nonEmpty) {
       // check conditions (a) and (b)
       val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, 
ctx.consumersForTopic.head._2.toSet)
       ctx.consumersForTopic.foreach { case (topic, threadIds) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala 
b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 9c779ce..c7c7836 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -67,7 +67,7 @@ class PartitionTopicInfo(val topic: String,
     }
   }
   
-  override def toString(): String = topic + ":" + partitionId.toString + ": 
fetched offset = " + fetchedOffset.get +
+  override def toString: String = topic + ":" + partitionId.toString + ": 
fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b4059a4..32478ca 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -257,13 +257,13 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
 
   def newBatch() {
     // raise error if the previous batch is not empty
-    if (leaderAndIsrRequestMap.size > 0)
+    if (leaderAndIsrRequestMap.nonEmpty)
       throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating " +
         "a new one. Some LeaderAndIsr state changes %s might be lost 
".format(leaderAndIsrRequestMap.toString()))
-    if (stopReplicaRequestMap.size > 0)
+    if (stopReplicaRequestMap.nonEmpty)
       throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
         "new one. Some StopReplica state changes %s might be lost 
".format(stopReplicaRequestMap.toString()))
-    if (updateMetadataRequestMap.size > 0)
+    if (updateMetadataRequestMap.nonEmpty)
       throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
         "new one. Some UpdateMetadata state changes %s might be lost 
".format(updateMetadataRequestMap.toString()))
   }
@@ -424,15 +424,15 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
       stopReplicaRequestMap.clear()
     } catch {
       case e : Throwable => {
-        if (leaderAndIsrRequestMap.size > 0) {
+        if (leaderAndIsrRequestMap.nonEmpty) {
           error("Haven't been able to send leader and isr requests, current 
state of " +
               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
         }
-        if (updateMetadataRequestMap.size > 0) {
+        if (updateMetadataRequestMap.nonEmpty) {
           error("Haven't been able to send metadata update requests, current 
state of " +
               s"the map is $updateMetadataRequestMap. Exception message: $e")
         }
-        if (stopReplicaRequestMap.size > 0) {
+        if (stopReplicaRequestMap.nonEmpty) {
           error("Haven't been able to send stop replica requests, current 
state of " +
               s"the map is $stopReplicaRequestMap. Exception message: $e")
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index d533a85..1584cc9 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -71,7 +71,7 @@ class ControllerContext(val zkUtils: ZkUtils,
 
   // getter
   def liveBrokers = liveBrokersUnderlying.filter(broker => 
!shuttingDownBrokerIds.contains(broker.id))
-  def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => 
!shuttingDownBrokerIds.contains(brokerId))
+  def liveBrokerIds = liveBrokerIdsUnderlying -- shuttingDownBrokerIds
 
   def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
   def liveOrShuttingDownBrokers = liveBrokersUnderlying
@@ -84,22 +84,23 @@ class ControllerContext(val zkUtils: ZkUtils,
   }
 
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
-    brokerIds.map { brokerId =>
+    brokerIds.flatMap { brokerId =>
       partitionReplicaAssignment
-        .filter { case(topicAndPartition, replicas) => 
replicas.contains(brokerId) }
-        .map { case(topicAndPartition, replicas) =>
-                 new PartitionAndReplica(topicAndPartition.topic, 
topicAndPartition.partition, brokerId) }
-    }.flatten.toSet
+        .filter { case (topicAndPartition, replicas) => 
replicas.contains(brokerId) }
+        .map { case (topicAndPartition, replicas) =>
+          new PartitionAndReplica(topicAndPartition.topic, 
topicAndPartition.partition, brokerId)
+        }
+    }.toSet
   }
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
     partitionReplicaAssignment
-      .filter { case(topicAndPartition, replicas) => 
topicAndPartition.topic.equals(topic) }
-      .map { case(topicAndPartition, replicas) =>
+      .filter { case (topicAndPartition, replicas) => 
topicAndPartition.topic.equals(topic) }
+      .flatMap { case (topicAndPartition, replicas) =>
         replicas.map { r =>
           new PartitionAndReplica(topicAndPartition.topic, 
topicAndPartition.partition, r)
         }
-    }.flatten.toSet
+      }.toSet
   }
 
   def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
@@ -112,10 +113,10 @@ class ControllerContext(val zkUtils: ZkUtils,
   }
 
   def replicasForPartition(partitions: collection.Set[TopicAndPartition]): 
collection.Set[PartitionAndReplica] = {
-    partitions.map { p =>
+    partitions.flatMap { p =>
       val replicas = partitionReplicaAssignment(p)
       replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
-    }.flatten
+    }
   }
 
   def removeTopic(topic: String) = {
@@ -139,7 +140,7 @@ object KafkaController extends Logging {
       Json.parseFull(controllerInfoString) match {
         case Some(m) =>
           val controllerInfo = m.asInstanceOf[Map[String, Any]]
-          return controllerInfo.get("brokerid").get.asInstanceOf[Int]
+          controllerInfo.get("brokerid").get.asInstanceOf[Int]
         case None => throw new KafkaException("Failed to parse the controller 
info json [%s].".format(controllerInfoString))
       }
     } catch {
@@ -148,7 +149,7 @@ object KafkaController extends Logging {
         warn("Failed to parse the controller info as json. "
           + "Probably this controller is still using the old format [%s] to 
store the broker id in zookeeper".format(controllerInfoString))
         try {
-          return controllerInfoString.toInt
+          controllerInfoString.toInt
         } catch {
           case t: Throwable => throw new KafkaException("Failed to parse the 
controller info: " + controllerInfoString + ". This is neither the new or the 
old format.", t)
         }
@@ -298,7 +299,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
         controllerContext.partitionLeadershipInfo.filter {
           case (topicAndPartition, leaderIsrAndControllerEpoch) =>
             leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && 
controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
-        }.map(_._1)
+        }.keys
       }
       replicatedPartitionsBrokerLeads().toSet
     }
@@ -439,7 +440,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     // check if topic deletion needs to be resumed. If at least one replica 
that belongs to the topic being deleted exists
     // on the newly restarted brokers, there is a chance that topic deletion 
can resume
     val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => 
deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-    if(replicasForTopicsToBeDeleted.size > 0) {
+    if(replicasForTopicsToBeDeleted.nonEmpty) {
       info(("Some replicas %s for topics scheduled for deletion %s are on the 
newly restarted brokers %s. " +
         "Signaling restart of topic deletion for these 
topics").format(replicasForTopicsToBeDeleted.mkString(","),
         deleteTopicManager.topicsToBeDeleted.mkString(","), 
newBrokers.mkString(",")))
@@ -479,7 +480,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, 
OfflineReplica)
     // check if topic deletion state for the dead replicas needs to be updated
     val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => 
deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-    if(replicasForTopicsToBeDeleted.size > 0) {
+    if(replicasForTopicsToBeDeleted.nonEmpty) {
       // it is required to mark the respective replicas in TopicDeletionFailed 
state since the replica cannot be
       // deleted when the broker is down. This will prevent the replica from 
being in TopicDeletionStarted state indefinitely
       // since topic deletion cannot be retried until at least one replica is 
in TopicDeletionStarted state
@@ -780,9 +781,9 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
       val replicasOpt = 
controllerContext.partitionReplicaAssignment.get(partition._1)
       val topicDeleted = replicasOpt.isEmpty
-      val successful = if(!topicDeleted) replicasOpt.get == 
partition._2.newReplicas else false
+      val successful = if (!topicDeleted) replicasOpt.get == 
partition._2.newReplicas else false
       topicDeleted || successful
-    }.map(_._1)
+    }.keys
     reassignedPartitions.foreach(p => 
removePartitionFromReassignedPartitions(p))
     var partitionsToReassign: mutable.Map[TopicAndPartition, 
ReassignedPartitionsContext] = new mutable.HashMap
     partitionsToReassign ++= partitionsBeingReassigned
@@ -992,7 +993,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
                                          newReplicaAssignmentForTopic: 
Map[TopicAndPartition, Seq[Int]]) {
     try {
       val zkPath = getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = 
zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => 
(e._1.partition.toString -> e._2)))
+      val jsonPartitionMap = 
zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => 
e._1.partition.toString -> e._2))
       zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, 
jsonPartitionMap))
     } catch {
@@ -1021,6 +1022,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
   /**
    * Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
    * metadata requests
+   *
    * @param brokers The brokers that the update metadata request should be 
sent to
    */
   def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: 
Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
@@ -1043,6 +1045,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
   /**
    * Removes a given partition replica from the ISR; if it is not the current
    * leader and there are sufficient remaining replicas in ISR.
+   *
    * @param topic topic
    * @param partition partition
    * @param replicaId replica Id
@@ -1109,6 +1112,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
 
   /**
    * Does not change leader or isr, but just increments the leader epoch
+   *
    * @param topic topic
    * @param partition partition
    * @return the new leaderAndIsr with an incremented leader epoch, or None if 
leaderAndIsr is empty.
@@ -1162,8 +1166,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
      * Called after the zookeeper session has expired and a new session has 
been created. You would have to re-create
      * any ephemeral nodes here.
      *
-     * @throws Exception
-     *             On any error.
+     * @throws Exception On any error.
      */
     @throws(classOf[Exception])
     def handleNewSession() {
@@ -1219,8 +1222,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
                   // do this check only if the broker is live and there are no 
partitions being reassigned currently
                   // and preferred replica election is not in progress
                   if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                      controllerContext.partitionsBeingReassigned.size == 0 &&
-                      
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
+                      controllerContext.partitionsBeingReassigned.isEmpty &&
+                      
controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
                       
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                       
controllerContext.allTopics.contains(topicPartition.topic)) {
                     onPreferredReplicaElection(Set(topicPartition), true)
@@ -1250,6 +1253,7 @@ class PartitionsReassignedListener(controller: 
KafkaController) extends IZkDataL
 
   /**
    * Invoked when some partitions are reassigned by the admin command
+   *
    * @throws Exception On any error.
    */
   @throws(classOf[Exception])
@@ -1276,8 +1280,8 @@ class PartitionsReassignedListener(controller: 
KafkaController) extends IZkDataL
 
   /**
    * Called when the leader information stored in zookeeper has been delete. 
Try to elect as the leader
-   * @throws Exception
-   *             On any error.
+   *
+   * @throws Exception On any error.
    */
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
@@ -1293,6 +1297,7 @@ class ReassignedPartitionsIsrChangeListener(controller: 
KafkaController, topic:
 
   /**
    * Invoked when some partitions need to move leader to preferred replica
+   *
    * @throws Exception On any error.
    */
   @throws(classOf[Exception])
@@ -1343,6 +1348,7 @@ class ReassignedPartitionsIsrChangeListener(controller: 
KafkaController, topic:
 
 /**
  * Called when leader intimates of isr change
+ *
  * @param controller
  */
 class IsrChangeNotificationListener(controller: KafkaController) extends 
IZkChildListener with Logging {
@@ -1354,7 +1360,7 @@ class IsrChangeNotificationListener(controller: 
KafkaController) extends IZkChil
       debug("[IsrChangeNotificationListener] Fired!!!")
       val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
       try {
-        val topicAndPartitions: immutable.Set[TopicAndPartition] = 
childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
+        val topicAndPartitions: immutable.Set[TopicAndPartition] = 
childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet
         if (topicAndPartitions.nonEmpty) {
           controller.updateLeaderAndIsrCache(topicAndPartitions)
           processUpdateNotifications(topicAndPartitions)
@@ -1417,6 +1423,7 @@ class PreferredReplicaElectionListener(controller: 
KafkaController) extends IZkD
 
   /**
    * Invoked when some partitions are reassigned by the admin command
+   *
    * @throws Exception On any error.
    */
   @throws(classOf[Exception])
@@ -1425,12 +1432,12 @@ class PreferredReplicaElectionListener(controller: 
KafkaController) extends IZkD
             .format(dataPath, data.toString))
     inLock(controllerContext.controllerLock) {
       val partitionsForPreferredReplicaElection = 
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
-      if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 
0)
+      
if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
         info("These partitions are already undergoing preferred replica 
election: %s"
           
.format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
       val partitions = partitionsForPreferredReplicaElection -- 
controllerContext.partitionsUndergoingPreferredReplicaElection
       val partitionsForTopicsToBeDeleted = partitions.filter(p => 
controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-      if(partitionsForTopicsToBeDeleted.size > 0) {
+      if(partitionsForTopicsToBeDeleted.nonEmpty) {
         error("Skipping preferred replica election for partitions %s since the 
respective topics are being deleted"
           .format(partitionsForTopicsToBeDeleted))
       }
@@ -1439,8 +1446,7 @@ class PreferredReplicaElectionListener(controller: 
KafkaController) extends IZkD
   }
 
   /**
-   * @throws Exception
-   *             On any error.
+   * @throws Exception On any error.
    */
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
@@ -1451,13 +1457,13 @@ case class ReassignedPartitionsContext(var newReplicas: 
Seq[Int] = Seq.empty,
                                        var isrChangeListener: 
ReassignedPartitionsIsrChangeListener = null)
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
-  override def toString(): String = {
+  override def toString: String = {
     "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
   }
 }
 
 case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, 
controllerEpoch: Int) {
-  override def toString(): String = {
+  override def toString: String = {
     val leaderAndIsrInfo = new StringBuilder
     leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
     leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 682ce1d..9517523 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -183,7 +183,7 @@ class ControlledShutdownLeaderSelector(controllerContext: 
ControllerContext)
     val liveAssignedReplicas = assignedReplicas.filter(r => 
liveOrShuttingDownBrokerIds.contains(r))
 
     val newIsr = currentLeaderAndIsr.isr.filter(brokerId => 
!controllerContext.shuttingDownBrokerIds.contains(brokerId))
-    liveAssignedReplicas.filter(newIsr.contains).headOption match {
+    liveAssignedReplicas.find(newIsr.contains) match {
       case Some(newLeader) =>
         debug("Partition %s : current leader = %d, new leader = 
%d".format(topicAndPartition, currentLeader, newLeader))
         (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, 
currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 47efc51..bf5fde4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -115,7 +115,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
       // try to move all partitions in NewPartition or OfflinePartition state 
to OnlinePartition state except partitions
       // that belong to topics to be deleted
       for((topicAndPartition, partitionState) <- partitionState
-          
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)))
 {
+          if 
!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))
 {
         if(partitionState.equals(OfflinePartition) || 
partitionState.equals(NewPartition))
           handleStateChange(topicAndPartition.topic, 
topicAndPartition.partition, OnlinePartition, 
controller.offlinePartitionSelector,
                             (new CallbackBuilder).build)
@@ -432,7 +432,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
             
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
             info("New topics: [%s], deleted topics: [%s], new partition 
replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
-            if(newTopics.size > 0)
+            if(newTopics.nonEmpty)
               controller.onNewTopicCreation(newTopics, 
addedPartitionReplicaAssignment.keySet.toSet)
           } catch {
             case e: Throwable => error("Error while handling new topic", e )
@@ -463,13 +463,13 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
           (children: Buffer[String]).toSet
         }
         debug("Delete topics listener fired for topics %s to be 
deleted".format(topicsToBeDeleted.mkString(",")))
-        val nonExistentTopics = topicsToBeDeleted.filter(t => 
!controllerContext.allTopics.contains(t))
-        if(nonExistentTopics.size > 0) {
+        val nonExistentTopics = topicsToBeDeleted -- 
controllerContext.allTopics
+        if(nonExistentTopics.nonEmpty) {
           warn("Ignoring request to delete non-existing topics " + 
nonExistentTopics.mkString(","))
           nonExistentTopics.foreach(topic => 
zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
         }
         topicsToBeDeleted --= nonExistentTopics
-        if(topicsToBeDeleted.size > 0) {
+        if(topicsToBeDeleted.nonEmpty) {
           info("Starting topic deletion for topics " + 
topicsToBeDeleted.mkString(","))
           // mark topic ineligible for deletion if other state changes are in 
progress
           topicsToBeDeleted.foreach { topic =>
@@ -512,7 +512,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
             error("Skipping adding partitions %s for topic %s since it is 
currently being deleted"
                   
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
           else {
-            if (partitionsToBeAdded.size > 0) {
+            if (partitionsToBeAdded.nonEmpty) {
               info("New partitions to be added %s".format(partitionsToBeAdded))
               
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
               
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d49b6af..d4e9bb4 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -107,7 +107,7 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
    */
   def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: 
ReplicaState,
                          callbacks: Callbacks = (new CallbackBuilder).build) {
-    if(replicas.size > 0) {
+    if(replicas.nonEmpty) {
       info("Invoking state change to %s for replicas %s".format(targetState, 
replicas.mkString(",")))
       try {
         brokerRequestBatch.newBatch()
@@ -370,9 +370,9 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
                 .format(newBrokerIdsSorted.mkString(","), 
deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
               
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
               
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
-              if(newBrokerIds.size > 0)
+              if(newBrokerIds.nonEmpty)
                 controller.onBrokerStartup(newBrokerIdsSorted)
-              if(deadBrokerIds.size > 0)
+              if(deadBrokerIds.nonEmpty)
                 controller.onBrokerFailure(deadBrokerIdsSorted)
             } catch {
               case e: Throwable => error("Error while handling broker 
changes", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index c6f80ac..f24c69c 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -95,7 +95,7 @@ class TopicDeletionManager(controller: KafkaController,
   def start() {
     if (isDeleteTopicEnabled) {
       deleteTopicsThread = new DeleteTopicsThread()
-      if (topicsToBeDeleted.size > 0)
+      if (topicsToBeDeleted.nonEmpty)
         deleteTopicStateChanged.set(true)
       deleteTopicsThread.start()
     }
@@ -142,7 +142,7 @@ class TopicDeletionManager(controller: KafkaController,
   def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
     if(isDeleteTopicEnabled) {
       val topicsToResumeDeletion = topics & topicsToBeDeleted
-      if(topicsToResumeDeletion.size > 0) {
+      if(topicsToResumeDeletion.nonEmpty) {
         topicsIneligibleForDeletion --= topicsToResumeDeletion
         resumeTopicDeletionThread()
       }
@@ -160,7 +160,7 @@ class TopicDeletionManager(controller: KafkaController,
   def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
     if(isDeleteTopicEnabled) {
       val replicasThatFailedToDelete = replicas.filter(r => 
isTopicQueuedUpForDeletion(r.topic))
-      if(replicasThatFailedToDelete.size > 0) {
+      if(replicasThatFailedToDelete.nonEmpty) {
         val topics = replicasThatFailedToDelete.map(_.topic)
         debug("Deletion failed for replicas %s. Halting deletion for topics %s"
           .format(replicasThatFailedToDelete.mkString(","), topics))
@@ -182,7 +182,7 @@ class TopicDeletionManager(controller: KafkaController,
     if(isDeleteTopicEnabled) {
       val newTopicsToHaltDeletion = topicsToBeDeleted & topics
       topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
-      if(newTopicsToHaltDeletion.size > 0)
+      if(newTopicsToHaltDeletion.nonEmpty)
         info("Halted deletion of topics 
%s".format(newTopicsToHaltDeletion.mkString(",")))
     }
   }
@@ -310,7 +310,7 @@ class TopicDeletionManager(controller: KafkaController,
     
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
 partitions)
     val partitionReplicaAssignmentByTopic = 
controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
     topics.foreach { topic =>
-      
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
+      onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
     }
   }
 
@@ -343,7 +343,7 @@ class TopicDeletionManager(controller: KafkaController,
       debug("Deletion started for replicas 
%s".format(replicasForDeletionRetry.mkString(",")))
       
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, 
ReplicaDeletionStarted,
         new 
Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
-      if(deadReplicasForTopic.size > 0) {
+      if(deadReplicasForTopic.nonEmpty) {
         debug("Dead Replicas (%s) found for topic 
%s".format(deadReplicasForTopic.mkString(","), topic))
         markTopicIneligibleForDeletion(Set(topic))
       }
@@ -373,7 +373,7 @@ class TopicDeletionManager(controller: KafkaController,
     val responseMap = stopReplicaResponse.responses.asScala
     val partitionsInError =
       if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
-      else responseMap.filter { case (_, error) => error != Errors.NONE.code 
}.map(_._1).toSet
+      else responseMap.filter { case (_, error) => error != Errors.NONE.code 
}.keySet
     val replicasInError = partitionsInError.map(p => 
PartitionAndReplica(p.topic, p.partition, replicaId))
     inLock(controllerContext.controllerLock) {
       // move all the failed replicas to ReplicaDeletionIneligible
@@ -397,7 +397,7 @@ class TopicDeletionManager(controller: KafkaController,
       inLock(controllerContext.controllerLock) {
         val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
 
-        if(!topicsQueuedForDeletion.isEmpty)
+        if(topicsQueuedForDeletion.nonEmpty)
           info("Handling deletion for topics " + 
topicsQueuedForDeletion.mkString(","))
 
         topicsQueuedForDeletion.foreach { topic =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index f445764..e9bbbd3 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -647,7 +647,7 @@ class GroupCoordinator(val brokerId: Int,
   def onCompleteJoin(group: GroupMetadata) {
     group synchronized {
       val failedMembers = group.notYetRejoinedMembers
-      if (group.isEmpty || !failedMembers.isEmpty) {
+      if (group.isEmpty || failedMembers.nonEmpty) {
         failedMembers.foreach { failedMember =>
           group.remove(failedMember.memberId)
           // TODO: cut the socket connection to the client

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index c6bc44e..b968f97 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -108,7 +108,7 @@ class GroupMetadataManager(val brokerId: Int,
 
   def isGroupLoading(groupId: String): Boolean = loadingPartitions 
synchronized loadingPartitions.contains(partitionFor(groupId))
 
-  def isLoading(): Boolean = loadingPartitions synchronized 
!loadingPartitions.isEmpty
+  def isLoading(): Boolean = loadingPartitions synchronized 
loadingPartitions.nonEmpty
 
   /**
    * Get the group associated with the given groupId, or null if not found

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala 
b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 1d799f2..c57b990 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -90,7 +90,7 @@ private[coordinator] class MemberMetadata(val memberId: 
String,
       if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
         return false
     }
-    return true
+    true
   }
 
   def summary(protocol: String): MemberSummary = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 92d9073..f625ba0 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -44,7 +44,7 @@ class TopicMetadataRequest(val versionId: Short,
 
   def sizeInBytes: Int = underlying.sizeInBytes()
 
-  override def toString(): String = {
+  override def toString: String = {
     describe(true)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 62dc7a1..76cd86e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -802,7 +802,7 @@ class Log(val dir: File,
     }
   }
 
-  override def toString() = "Log(" + dir + ")"
+  override def toString = "Log(" + dir + ")"
 
   /**
    * This method performs an asynchronous log segment delete by doing the 
following:

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 37f7579..6bbc50c 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -203,7 +203,7 @@ class LogSegment(val log: FileMessageSet,
     truncated
   }
 
-  override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" 
+ size + ")"
+  override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + 
size + ")"
 
   /**
    * Truncate off all index and log entries with offsets >= the given offset.

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala 
b/core/src/main/scala/kafka/message/Message.scala
index 2ab2e0c..bb91078 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -402,7 +402,7 @@ class Message(val buffer: ByteBuffer,
       throw new IllegalArgumentException(s"Invalid timestamp $timestamp. 
Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
   }
 
-  override def toString(): String = {
+  override def toString: String = {
     if (magic == MagicValue_V0)
       s"Message(magic = $magic, attributes = $attributes, crc = $checksum, key 
= $key, payload = $payload)"
     else

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 0d6da34..999b2a4 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -54,12 +54,14 @@ object KafkaMetricsReporter {
     ReporterStarted synchronized {
       if (!ReporterStarted.get()) {
         val metricsConfig = new KafkaMetricsConfig(verifiableProps)
-        if(metricsConfig.reporters.size > 0) {
+        if(metricsConfig.reporters.nonEmpty) {
           metricsConfig.reporters.foreach(reporterType => {
             val reporter = 
CoreUtils.createObject[KafkaMetricsReporter](reporterType)
             reporter.init(verifiableProps)
-            if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
-              CoreUtils.registerMBean(reporter, 
reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+            reporter match {
+              case bean: KafkaMetricsReporterMBean => 
CoreUtils.registerMBean(reporter, bean.getMBeanName)
+              case _ =>
+            }
           })
           ReporterStarted.set(true)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala 
b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
index 364f24b..153d636 100644
--- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
+++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.network.NetworkSend
 
 object RequestOrResponseSend {
   def serialize(request: RequestOrResponse): ByteBuffer = {
-    val buffer = ByteBuffer.allocate(request.sizeInBytes + 
(if(request.requestId != None) 2 else 0))
+    val buffer = ByteBuffer.allocate(request.sizeInBytes + 
(if(request.requestId.isDefined) 2 else 0))
     request.requestId match {
       case Some(requestId) =>
         buffer.putShort(requestId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala 
b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 4616c7e..97289a1 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -55,7 +55,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
           }
       }
     val partitionMetadata = metadata.partitionsMetadata
-    if(partitionMetadata.size == 0) {
+    if(partitionMetadata.isEmpty) {
       if(metadata.errorCode != Errors.NONE.code) {
         throw new KafkaException(Errors.forCode(metadata.errorCode).exception)
       } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala 
b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 8ab948a..92bbbcf 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -30,7 +30,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) 
extends KafkaMetricsGroup
 
   val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
   val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, 
tags)
-  val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags);
+  val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala 
b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index b79e64b..f9591ad 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -66,7 +66,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     var remainingRetries = config.messageSendMaxRetries + 1
     val correlationIdStart = correlationId.get()
     debug("Handling %d events".format(events.size))
-    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+    while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
       topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
       if (topicMetadataRefreshInterval >= 0 &&
           SystemTime.milliseconds - lastTopicMetadataRefreshTime > 
topicMetadataRefreshInterval) {
@@ -76,7 +76,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         lastTopicMetadataRefreshTime = SystemTime.milliseconds
       }
       outstandingProduceRequests = 
dispatchSerializedData(outstandingProduceRequests)
-      if (outstandingProduceRequests.size > 0) {
+      if (outstandingProduceRequests.nonEmpty) {
         info("Back off for %d ms before retrying send. Remaining retries = 
%d".format(config.retryBackoffMs, remainingRetries-1))
         // back off and update the topic metadata cache before attempting 
another send operation
         Thread.sleep(config.retryBackoffMs)
@@ -87,7 +87,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         producerStats.resendRate.mark()
       }
     }
-    if(outstandingProduceRequests.size > 0) {
+    if(outstandingProduceRequests.nonEmpty) {
       producerStats.failedSendRate.mark()
       val correlationIdEnd = correlationId.get()
       error("Failed to send requests for topics %s with correlation ids in 
[%d,%d]"
@@ -261,9 +261,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    */
   private def send(brokerId: Int, messagesPerTopic: 
collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
     if(brokerId < 0) {
-      warn("Failed to send data since partitions %s don't have a 
leader".format(messagesPerTopic.map(_._1).mkString(",")))
+      warn("Failed to send data since partitions %s don't have a 
leader".format(messagesPerTopic.keys.mkString(",")))
       messagesPerTopic.keys.toSeq
-    } else if(messagesPerTopic.size > 0) {
+    } else if(messagesPerTopic.nonEmpty) {
       val currentCorrelationId = correlationId.getAndIncrement
       val producerRequest = new ProducerRequest(currentCorrelationId, 
config.clientId, config.requestRequiredAcks,
         config.requestTimeoutMs, messagesPerTopic)
@@ -285,7 +285,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           }
           val failedPartitionsAndStatus = response.status.filter(_._2.error != 
Errors.NONE.code).toSeq
           failedTopicPartitions = 
failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
-          if(failedTopicPartitions.size > 0) {
+          if(failedTopicPartitions.nonEmpty) {
             val errorString = failedPartitionsAndStatus
               .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
                                     (p1._1.topic.compareTo(p2._1.topic) == 0 
&& p1._1.partition < p2._1.partition))
@@ -302,7 +302,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       } catch {
         case t: Throwable =>
           warn("Failed to send producer request with correlation id %d to 
broker %d with data for partitions %s"
-            .format(currentCorrelationId, brokerId, 
messagesPerTopic.map(_._1).mkString(",")), t)
+            .format(currentCorrelationId, brokerId, 
messagesPerTopic.keys.mkString(",")), t)
           messagesPerTopic.keys.toSeq
       }
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 18fff45..a36a07d 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -154,7 +154,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   def isSuperUser(operation: Operation, resource: Resource, principal: 
KafkaPrincipal, host: String): Boolean = {
-    if (superUsers.exists( _ == principal)) {
+    if (superUsers.contains(principal)) {
       authorizerLogger.debug(s"principal = $principal is a super user, 
allowing operation without checking acls.")
       true
     } else false
@@ -275,7 +275,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       val newAcls = getNewAcls(currentVersionedAcls.acls)
       val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
       val (updateSucceeded, updateVersion) =
-        if (!newAcls.isEmpty) {
+        if (newAcls.nonEmpty) {
          updatePath(path, data, currentVersionedAcls.zkVersion)
         } else {
           trace(s"Deleting path for $resource because it had no ACLs 
remaining")
@@ -285,7 +285,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       if (!updateSucceeded) {
         trace(s"Failed to update ACLs for $resource. Used version 
${currentVersionedAcls.zkVersion}. Reading data and retrying update.")
         Thread.sleep(backoffTime)
-        currentVersionedAcls = getAclsFromZk(resource);
+        currentVersionedAcls = getAclsFromZk(resource)
         retries += 1
       } else {
         newVersionedAcls = VersionedAcls(newAcls, updateVersion)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5863c72..c99ba97 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -157,7 +157,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * Returns the quota for the specified clientId
    */
   def quota(clientId: String): Quota =
-    if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) 
else defaultQuota;
+    if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) 
else defaultQuota
 
   /*
    * This function either returns the sensors for a given client id or creates 
them if they don't exist

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2205568..5248edf 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -175,7 +175,7 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: String,
    * @return true iff the delayed operations can be completed by the caller
    */
   def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
-    assert(watchKeys.size > 0, "The watch key list can't be empty")
+    assert(watchKeys.nonEmpty, "The watch key list can't be empty")
 
     // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
     // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1edc162..ebd1732 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -293,7 +293,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val currentTimestamp = SystemTime.milliseconds
         val defaultExpireTimestamp = offsetRetention + currentTimestamp
         val partitionData = authorizedRequestInfo.mapValues { partitionData =>
-          val metadata = if (partitionData.metadata == null) 
OffsetMetadata.NoMetadata else partitionData.metadata;
+          val metadata = if (partitionData.metadata == null) 
OffsetMetadata.NoMetadata else partitionData.metadata
           new OffsetAndMetadata(
             offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
             commitTimestamp = currentTimestamp,

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a664484..ca66f9d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1002,7 +1002,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
     require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 
1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or 
greater than 0")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, 
"log.retention.ms must be unlimited (-1) or, equal or greater than 1")
-    require(logDirs.size > 0)
+    require(logDirs.nonEmpty)
     require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, 
"log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
     require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, 
"replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" 
+
       " to prevent unnecessary socket timeouts")

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index f95d9ef..994e28e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -475,7 +475,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
               response = channel.receive()
               val shutdownResponse = 
kafka.api.ControlledShutdownResponse.readFrom(response.payload())
               if (shutdownResponse.errorCode == Errors.NONE.code && 
shutdownResponse.partitionsRemaining != null &&
-                shutdownResponse.partitionsRemaining.size == 0) {
+                shutdownResponse.partitionsRemaining.isEmpty) {
                 shutdownSucceeded = true
                 info ("Controlled shutdown succeeded")
               }
@@ -649,7 +649,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         s"Configured broker.id $brokerId doesn't match stored broker.id 
${brokerIdSet.last} in meta.properties. " +
         s"If you moved your data, make sure your configured broker.id matches. 
" +
         s"If you intend to create a new broker, you should remove all data in 
your data directories (log.dirs).")
-    else if(brokerIdSet.size == 0 && brokerId < 0 && 
config.brokerIdGenerationEnable)  // generate a new brokerId from Zookeeper
+    else if(brokerIdSet.isEmpty && brokerId < 0 && 
config.brokerIdGenerationEnable)  // generate a new brokerId from Zookeeper
       brokerId = generateBrokerId
     else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68f2385..447fb40 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -359,9 +359,8 @@ class ReplicaManager(val config: KafkaConfig,
       // Just return an error and don't handle the request at all
       val responseStatus = messagesPerPartition.map {
         case (topicAndPartition, messageSet) =>
-          (topicAndPartition -> new 
PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
-                                                      
LogAppendInfo.UnknownLogAppendInfo.firstOffset,
-                                                      Message.NoTimestamp))
+          topicAndPartition -> new 
PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+            LogAppendInfo.UnknownLogAppendInfo.firstOffset, 
Message.NoTimestamp)
       }
       responseCallback(responseStatus)
     }
@@ -375,7 +374,7 @@ class ReplicaManager(val config: KafkaConfig,
   private def delayedRequestRequired(requiredAcks: Short, 
messagesPerPartition: Map[TopicPartition, MessageSet],
                                        localProduceResults: 
Map[TopicPartition, LogAppendResult]): Boolean = {
     requiredAcks == -1 &&
-    messagesPerPartition.size > 0 &&
+    messagesPerPartition.nonEmpty &&
     localProduceResults.values.count(_.error.isDefined) < 
messagesPerPartition.size
   }
 
@@ -639,13 +638,13 @@ class ReplicaManager(val config: KafkaConfig,
         val partitionsTobeLeader = partitionState.filter { case (partition, 
stateInfo) =>
           stateInfo.leader == config.brokerId
         }
-        val partitionsToBeFollower = (partitionState -- 
partitionsTobeLeader.keys)
+        val partitionsToBeFollower = partitionState -- 
partitionsTobeLeader.keys
 
-        val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
+        val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
           makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, 
correlationId, responseMap)
         else
           Set.empty[Partition]
-        val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
+        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
           makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, 
correlationId, responseMap, metadataCache)
         else
           Set.empty[Partition]

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 6480ff5..8e5dcc8 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -91,7 +91,7 @@ object ConsumerPerformance {
     val elapsedSecs = (endMs - startMs) / 1000.0
     if (!config.showDetailedStats) {
       val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
-      println(("%s, %s, %.4f, %.4f, %d, 
%.4f").format(config.dateFormat.format(startMs), 
config.dateFormat.format(endMs),
+      println("%s, %s, %.4f, %.4f, %d, 
%.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
         totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, 
totalMessagesRead.get / elapsedSecs))
     }
   }
@@ -156,7 +156,7 @@ object ConsumerPerformance {
     val elapsedMs: Double = endMs - startMs
     val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
     val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
-    println(("%s, %d, %.4f, %.4f, %d, %.4f").format(dateFormat.format(endMs), 
id, totalMBRead,
+    println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), 
id, totalMBRead,
         1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - 
lastMessagesRead) / elapsedMs) * 1000.0))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala 
b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index cf2000b..d4c0f34 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -63,7 +63,7 @@ object ProducerPerformance extends Logging {
     val endMs = System.currentTimeMillis
     val elapsedSecs = (endMs - startMs) / 1000.0
     val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
-    println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
+    println("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f".format(
       config.dateFormat.format(startMs), config.dateFormat.format(endMs),
       config.compressionCodec.codec, config.messageSize, config.batchSize, 
totalMBSent,
       totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get 
/ elapsedSecs))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 71bf0c0..9a059df 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -180,7 +180,7 @@ object ReplicaVerificationTool extends Logging {
                            fetchSize = fetchSize,
                            maxWait = maxWaitMs,
                            minBytes = 1,
-                           doVerification = (brokerId == verificationBrokerId))
+                           doVerification = brokerId == verificationBrokerId)
     }
 
     Runtime.getRuntime.addShutdownHook(new Thread() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 5e3c605..3abbc40 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -92,7 +92,7 @@ object SimpleConsumerPerformance {
           val reportTime = System.currentTimeMillis
           val elapsed = (reportTime - lastReportTime)/1000.0
           val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024)
-          println(("%s, %d, %.4f, %.4f, %d, 
%.4f").format(config.dateFormat.format(reportTime), config.fetchSize,
+          println("%s, %d, %.4f, %.4f, %d, 
%.4f".format(config.dateFormat.format(reportTime), config.fetchSize,
             (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
             totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
         }
@@ -107,7 +107,7 @@ object SimpleConsumerPerformance {
 
     if(!config.showDetailedStats) {
       val totalMBRead = (totalBytesRead*1.0)/(1024*1024)
-      println(("%s, %s, %d, %.4f, %.4f, %d, 
%.4f").format(config.dateFormat.format(startMs),
+      println("%s, %s, %d, %.4f, %.4f, %d, 
%.4f".format(config.dateFormat.format(startMs),
         config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, 
totalMBRead/elapsed,
         totalMessagesRead, totalMessagesRead/elapsed))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 6ad68b6..c975d24 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -131,15 +131,15 @@ object SimpleConsumerShell extends Logging {
     ToolsUtils.validatePortOrDie(parser,brokerList)
     val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), 
metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
-    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+    if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + 
"what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)
     }
 
     // validating partition id
-    val partitionsMetadata = topicsMetadata(0).partitionsMetadata
+    val partitionsMetadata = topicsMetadata.head.partitionsMetadata
     val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == 
partitionId)
-    if (!partitionMetadataOpt.isDefined) {
+    if (partitionMetadataOpt.isEmpty) {
       System.err.println("Error: partition %d does not exist for topic 
%s".format(partitionId, topic))
       System.exit(1)
     }
@@ -149,7 +149,7 @@ object SimpleConsumerShell extends Logging {
     var replicaOpt: Option[BrokerEndPoint] = null
     if (replicaId == UseLeaderReplica) {
       replicaOpt = partitionMetadataOpt.get.leader
-      if (!replicaOpt.isDefined) {
+      if (replicaOpt.isEmpty) {
         System.err.println("Error: user specifies to fetch from leader for 
partition (%s, %d) which has not been elected yet".format(topic, partitionId))
         System.exit(1)
       }
@@ -157,7 +157,7 @@ object SimpleConsumerShell extends Logging {
     else {
       val replicasForPartition = partitionMetadataOpt.get.replicas
       replicaOpt = replicasForPartition.find(r => r.id == replicaId)
-      if(!replicaOpt.isDefined) {
+      if(replicaOpt.isEmpty) {
         System.err.println("Error: replica %d does not exist for partition 
(%s, %d)".format(replicaId, topic, partitionId))
         System.exit(1)
       }

Reply via email to