This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ed09db6c4 MINOR: Scala cleanups in core (#12058)
2ed09db6c4 is described below

commit 2ed09db6c497185584e7e33044d0ce1945a61681
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Apr 20 15:10:46 2022 +0200

    MINOR: Scala cleanups in core (#12058)
    
    
    Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  2 +-
 ...TransactionMarkerRequestCompletionHandler.scala |  2 +-
 core/src/main/scala/kafka/log/AbstractIndex.scala  |  2 +-
 core/src/main/scala/kafka/log/LocalLog.scala       |  8 ++---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +-
 core/src/main/scala/kafka/log/LogConfig.scala      |  4 +--
 core/src/main/scala/kafka/log/LogLoader.scala      |  2 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala    |  2 +-
 core/src/main/scala/kafka/log/TimeIndex.scala      |  2 +-
 .../kafka/metrics/KafkaCSVMetricsReporter.scala    |  2 +-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala    |  2 +-
 .../kafka/metrics/LinuxIoMetricsCollector.scala    |  4 +--
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  4 +--
 .../scala/kafka/raft/KafkaNetworkChannel.scala     |  2 +-
 .../kafka/server/BrokerLifecycleManager.scala      | 42 +++++++++++-----------
 .../src/main/scala/kafka/server/BrokerServer.scala |  4 +--
 .../scala/kafka/server/ClientQuotaManager.scala    |  6 ++--
 .../main/scala/kafka/server/ConfigHandler.scala    |  4 +--
 .../main/scala/kafka/server/ControllerApis.scala   |  2 +-
 .../main/scala/kafka/server/ControllerServer.scala |  2 +-
 .../kafka/server/DelegationTokenManager.scala      | 12 +++----
 .../src/main/scala/kafka/server/FetchSession.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 14 ++++----
 core/src/main/scala/kafka/server/KafkaServer.scala |  4 +--
 .../scala/kafka/server/PartitionMetadataFile.scala |  5 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 25 +++++++------
 .../kafka/server/ReplicationQuotaManager.scala     |  2 +-
 .../server/metadata/BrokerMetadataListener.scala   | 18 +++++-----
 .../server/metadata/BrokerMetadataPublisher.scala  | 12 +++----
 .../metadata/BrokerMetadataSnapshotter.scala       |  2 +-
 .../metadata/ClientQuotaMetadataManager.scala      | 12 +++----
 .../kafka/server/metadata/KRaftMetadataCache.scala | 22 ++++++------
 .../kafka/server/metadata/ZkConfigRepository.scala |  2 +-
 .../main/scala/kafka/utils/CommandLineUtils.scala  |  4 +--
 core/src/main/scala/kafka/utils/Exit.scala         |  2 +-
 core/src/main/scala/kafka/utils/FileLock.scala     |  2 +-
 .../main/scala/kafka/utils/KafkaScheduler.scala    |  2 +-
 core/src/main/scala/kafka/utils/ToolsUtils.scala   |  2 +-
 core/src/main/scala/kafka/utils/VersionInfo.scala  |  2 +-
 39 files changed, 121 insertions(+), 125 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 22d82dfa6d..6bf337d679 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1104,7 +1104,7 @@ class GroupCoordinator(val brokerId: Int,
         groupId != null
       case _ =>
         // The remaining APIs are groups using Kafka for group coordination 
and must have a non-empty groupId
-        groupId != null && !groupId.isEmpty
+        groupId != null && groupId.nonEmpty
     }
   }
 
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 848e0fa65c..7a59139b17 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -89,7 +89,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
 
       val writeTxnMarkerResponse = 
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
-      val responseErrors = writeTxnMarkerResponse.errorsByProducerId;
+      val responseErrors = writeTxnMarkerResponse.errorsByProducerId
       for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
         val transactionalId = txnIdAndMarker.txnId
         val txnMarker = txnIdAndMarker.txnMarkerEntry
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 31b9f6d8dd..37cd4b9f55 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -188,7 +188,7 @@ abstract class AbstractIndex(@volatile private var _file: 
File, val baseOffset:
             safeForceUnmap()
           raf.setLength(roundedNewSize)
           _length = roundedNewSize
-          mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
roundedNewSize)
+          mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, 
roundedNewSize)
           _maxEntries = mmap.limit() / entrySize
           mmap.position(position)
           debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position 
is ${mmap.position()} " +
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
index 620f33119a..b0e7b0e446 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -86,7 +86,7 @@ class LocalLog(@volatile private var _dir: File,
 
   private[log] def dir: File = _dir
 
-  private[log] def name: String = dir.getName()
+  private[log] def name: String = dir.getName
 
   private[log] def parentDir: String = _parentDir
 
@@ -657,9 +657,9 @@ object LocalLog extends Logging {
    */
   private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
     val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
-    val suffix = 
s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}"
+    val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
     val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
-    s"${topicPartition.topic().substring(0, prefixLength)}${suffix}"
+    s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
   }
 
   /**
@@ -931,7 +931,7 @@ object LocalLog extends Logging {
     // if we crash in the middle of this we complete the swap in loadSegments()
     if (!isRecoveredSwapFile)
       
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(CleanedFileSuffix, 
SwapFileSuffix))
-    sortedNewSegments.reverse.foreach(existingSegments.add(_))
+    sortedNewSegments.reverse.foreach(existingSegments.add)
     val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
 
     // delete the old files
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 55d795243d..df20a4a36e 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -486,7 +486,7 @@ private[log] class Cleaner(val id: Int,
   /* buffer used for write i/o */
   private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
 
-  private val decompressionBufferSupplier = BufferSupplier.create();
+  private val decompressionBufferSupplier = BufferSupplier.create()
 
   require(offsetMap.slots * dupBufferLoadFactor > 1, "offset map is too small 
to fit in even a single message, so log cleaning will never make progress. You 
can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads")
 
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index e0a2cb463e..3f70f88320 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -145,7 +145,7 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
         }
 
         if (localLogRetentionBytes > retentionSize) {
-          throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, 
localLogRetentionBytes, s"Value must not be more than property: 
${LogConfig.RetentionBytesProp} value.");
+          throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, 
localLogRetentionBytes, s"Value must not be more than property: 
${LogConfig.RetentionBytesProp} value.")
         }
 
         localLogRetentionBytes
@@ -268,7 +268,7 @@ object LogConfig {
 
   private[log] val ServerDefaultHeaderName = "Server Default Property"
 
-  val configsWithNoServerDefaults: Set[String] = 
Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp, 
LocalLogRetentionBytesProp);
+  val configsWithNoServerDefaults: Set[String] = 
Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp, 
LocalLogRetentionBytesProp)
 
   // Package private for testing
   private[log] class LogConfigDef(base: ConfigDef) extends ConfigDef(base) {
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala 
b/core/src/main/scala/kafka/log/LogLoader.scala
index eb9dec7a58..581d016e5e 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -392,7 +392,7 @@ class LogLoader(
           Some(logEndOffset)
         else {
           warn(s"Deleting all segments because logEndOffset ($logEndOffset) " +
-            s"is smaller than logStartOffset ${logStartOffsetCheckpoint}. " +
+            s"is smaller than logStartOffset $logStartOffsetCheckpoint. " +
             "This could happen if segment files were deleted from the file 
system.")
           removeAndDeleteSegmentsAsync(segments.values)
           leaderEpochCache.foreach(_.clearAndFlush())
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala 
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index a4183b1715..62afbac930 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -156,7 +156,7 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
     }
   }
 
-  override def truncate() = truncateToEntries(0)
+  override def truncate(): Unit = truncateToEntries(0)
 
   override def truncateTo(offset: Long): Unit = {
     inLock(lock) {
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala 
b/core/src/main/scala/kafka/log/TimeIndex.scala
index 779a45138b..2c464d602f 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -159,7 +159,7 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
     }
   }
 
-  override def truncate() = truncateToEntries(0)
+  override def truncate(): Unit = truncateToEntries(0)
 
   /**
    * Remove all entries from the index which have an offset greater than or 
equal to the given offset.
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index fa3c45a494..607cd188e0 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -51,7 +51,7 @@ private class KafkaCSVMetricsReporter extends 
KafkaMetricsReporter
         val metricsConfig = new KafkaMetricsConfig(props)
         csvDir = new File(props.getString("kafka.csv.metrics.dir", 
"kafka_metrics"))
         Utils.delete(csvDir)
-        Files.createDirectories(csvDir.toPath())
+        Files.createDirectories(csvDir.toPath)
         underlying = new CsvReporter(KafkaYammerMetrics.defaultRegistry(), 
csvDir)
         if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = 
false)) {
           initialized = true
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index d174f3ea83..161d1f2f36 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -53,7 +53,7 @@ trait KafkaMetricsGroup extends Logging {
 
     nameBuilder.append(typeName)
 
-    if (name.length > 0) {
+    if (name.nonEmpty) {
       nameBuilder.append(",name=")
       nameBuilder.append(name)
     }
diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala 
b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
index 17de008580..5a41dbad73 100644
--- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -87,10 +87,10 @@ class LinuxIoMetricsCollector(procRoot: String, val time: 
Time, val logger: Logg
   }
 
   def usable(): Boolean = {
-    if (path.toFile().exists()) {
+    if (path.toFile.exists()) {
       updateValues(time.milliseconds())
     } else {
-      logger.debug(s"disabling IO metrics collection because ${path} does not 
exist.")
+      logger.debug(s"disabling IO metrics collection because $path does not 
exist.")
       false
     }
   }
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 1b0aef3fed..dba8975d43 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -368,7 +368,7 @@ final class KafkaMetadataLog private (
       val firstBatch = batchIterator.next()
       val records = firstBatch.streamingIterator(new 
BufferSupplier.GrowableBufferSupplier())
       if (firstBatch.isControlBatch) {
-        val header = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+        val header = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next())
         Some(header.lastContainedLogTimestamp())
       } else {
         warn("Did not find control record at beginning of snapshot")
@@ -405,7 +405,7 @@ final class KafkaMetadataLog private (
    *
    * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
    */
-  private def cleanSnapshots(predicate: (OffsetAndEpoch) => Boolean): Boolean 
= {
+  private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
     if (snapshots.size < 2)
       return false
 
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala 
b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index d99039132d..c44d57102c 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -45,7 +45,7 @@ object KafkaNetworkChannel {
         // Since we already have the request, we go through a simplified 
builder
         new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
           override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
-          override def toString(): String = fetchRequest.toString
+          override def toString: String = fetchRequest.toString
         }
       case fetchSnapshotRequest: FetchSnapshotRequestData =>
         new FetchSnapshotRequest.Builder(fetchSnapshotRequest)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 394c353e45..3ca2704eb3 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -264,7 +264,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
         new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
         new RegistrationTimeoutEvent())
       sendBrokerRegistration()
-      info(s"Incarnation ${incarnationId} of broker ${nodeId} in cluster 
${clusterId} " +
+      info(s"Incarnation $incarnationId of broker $nodeId in cluster 
$clusterId " +
         "is now STARTING.")
     }
   }
@@ -285,7 +285,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
         setListeners(_advertisedListeners).
         setRack(rack.orNull)
     if (isDebugEnabled) {
-      debug(s"Sending broker registration ${data}")
+      debug(s"Sending broker registration $data")
     }
     _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
       new BrokerRegistrationResponseHandler())
@@ -294,18 +294,18 @@ class BrokerLifecycleManager(val config: KafkaConfig,
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
       if (response.authenticationException() != null) {
-        error(s"Unable to register broker ${nodeId} because of an 
authentication exception.",
-          response.authenticationException());
+        error(s"Unable to register broker $nodeId because of an authentication 
exception.",
+          response.authenticationException())
         scheduleNextCommunicationAfterFailure()
       } else if (response.versionMismatch() != null) {
-        error(s"Unable to register broker ${nodeId} because of an API version 
problem.",
-          response.versionMismatch());
+        error(s"Unable to register broker $nodeId because of an API version 
problem.",
+          response.versionMismatch())
         scheduleNextCommunicationAfterFailure()
       } else if (response.responseBody() == null) {
-        warn(s"Unable to register broker ${nodeId}.")
+        warn(s"Unable to register broker $nodeId.")
         scheduleNextCommunicationAfterFailure()
       } else if 
(!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) {
-        error(s"Unable to register broker ${nodeId} because the controller 
returned an " +
+        error(s"Unable to register broker $nodeId because the controller 
returned an " +
           "invalid response type.")
         scheduleNextCommunicationAfterFailure()
       } else {
@@ -316,11 +316,11 @@ class BrokerLifecycleManager(val config: KafkaConfig,
           _brokerEpoch = message.data().brokerEpoch()
           registered = true
           initialRegistrationSucceeded = true
-          info(s"Successfully registered broker ${nodeId} with broker epoch 
${_brokerEpoch}")
+          info(s"Successfully registered broker $nodeId with broker epoch 
${_brokerEpoch}")
           scheduleNextCommunicationImmediately() // Immediately send a 
heartbeat
         } else {
-          info(s"Unable to register broker ${nodeId} because the controller 
returned " +
-            s"error ${errorCode}")
+          info(s"Unable to register broker $nodeId because the controller 
returned " +
+            s"error $errorCode")
           scheduleNextCommunicationAfterFailure()
         }
       }
@@ -341,7 +341,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
       setWantFence(!readyToUnfence).
       setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
     if (isTraceEnabled) {
-      trace(s"Sending broker heartbeat ${data}")
+      trace(s"Sending broker heartbeat $data")
     }
     _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
       new BrokerHeartbeatResponseHandler())
@@ -350,18 +350,18 @@ class BrokerLifecycleManager(val config: KafkaConfig,
   private class BrokerHeartbeatResponseHandler extends 
ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
       if (response.authenticationException() != null) {
-        error(s"Unable to send broker heartbeat for ${nodeId} because of an " +
-          "authentication exception.", response.authenticationException());
+        error(s"Unable to send broker heartbeat for $nodeId because of an " +
+          "authentication exception.", response.authenticationException())
         scheduleNextCommunicationAfterFailure()
       } else if (response.versionMismatch() != null) {
-        error(s"Unable to send broker heartbeat for ${nodeId} because of an 
API " +
-          "version problem.", response.versionMismatch());
+        error(s"Unable to send broker heartbeat for $nodeId because of an API 
" +
+          "version problem.", response.versionMismatch())
         scheduleNextCommunicationAfterFailure()
       } else if (response.responseBody() == null) {
-        warn(s"Unable to send broker heartbeat for ${nodeId}. Retrying.")
+        warn(s"Unable to send broker heartbeat for $nodeId. Retrying.")
         scheduleNextCommunicationAfterFailure()
       } else if 
(!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) {
-        error(s"Unable to send broker heartbeat for ${nodeId} because the 
controller " +
+        error(s"Unable to send broker heartbeat for $nodeId because the 
controller " +
           "returned an invalid response type.")
         scheduleNextCommunicationAfterFailure()
       } else {
@@ -371,7 +371,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
           failedAttempts = 0
           _state match {
             case BrokerState.STARTING =>
-              if (message.data().isCaughtUp()) {
+              if (message.data().isCaughtUp) {
                 info(s"The broker has caught up. Transitioning from STARTING 
to RECOVERY.")
                 _state = BrokerState.RECOVERY
                 initialCatchUpFuture.complete(null)
@@ -382,7 +382,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
               // there is no recovery work to be done, we start up a bit 
quicker.
               scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
             case BrokerState.RECOVERY =>
-              if (!message.data().isFenced()) {
+              if (!message.data().isFenced) {
                 info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
                 _state = BrokerState.RUNNING
               } else {
@@ -417,7 +417,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
               scheduleNextCommunicationAfterSuccess()
           }
         } else {
-          warn(s"Broker ${nodeId} sent a heartbeat request but received error 
${errorCode}.")
+          warn(s"Broker $nodeId sent a heartbeat request but received error 
$errorCode.")
           scheduleNextCommunicationAfterFailure()
         }
       }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c537d99b0f..8fdc59d945 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -64,8 +64,8 @@ class BrokerSnapshotWriterBuilder(raftClient: 
RaftClient[ApiMessageAndVersion])
     raftClient.createSnapshot(committedOffset, committedEpoch, 
lastContainedLogTime).
         asScala.getOrElse(
       throw new RuntimeException("A snapshot already exists with " +
-        s"committedOffset=${committedOffset}, 
committedEpoch=${committedEpoch}, " +
-        s"lastContainedLogTime=${lastContainedLogTime}")
+        s"committedOffset=$committedOffset, committedEpoch=$committedEpoch, " +
+        s"lastContainedLogTime=$lastContainedLogTime")
     )
   }
 }
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 7334519986..ee7c70bec9 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -593,7 +593,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       if (sanitizedUser != null && clientId != null) {
         val userEntity = Some(UserEntity(sanitizedUser))
         val clientIdEntity = Some(ClientIdEntity(clientId))
-        if (!sanitizedUser.isEmpty && !clientId.isEmpty) {
+        if (sanitizedUser.nonEmpty && clientId.nonEmpty) {
           // /config/users/<user>/clients/<client-id>
           quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, 
clientIdEntity))
           if (quota == null) {
@@ -608,14 +608,14 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
             // /config/users/<default>/clients/<default>
             quota = overriddenQuotas.get(DefaultUserClientIdQuotaEntity)
           }
-        } else if (!sanitizedUser.isEmpty) {
+        } else if (sanitizedUser.nonEmpty) {
           // /config/users/<user>
           quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, None))
           if (quota == null) {
             // /config/users/<default>
             quota = overriddenQuotas.get(DefaultUserQuotaEntity)
           }
-        } else if (!clientId.isEmpty) {
+        } else if (clientId.nonEmpty) {
           // /config/clients/<client-id>
           quota = overriddenQuotas.get(KafkaQuotaEntity(None, clientIdEntity))
           if (quota == null) {
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 2fe49ad17b..ca6a42b824 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -64,7 +64,7 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
     }
     logManager.updateTopicConfig(topic, props)
 
-    def updateThrottledList(prop: String, quotaManager: 
ReplicationQuotaManager) = {
+    def updateThrottledList(prop: String, quotaManager: 
ReplicationQuotaManager): Unit = {
       if (topicConfig.containsKey(prop) && 
topicConfig.getProperty(prop).nonEmpty) {
         val partitions = parseThrottledPartitions(topicConfig, 
kafkaConfig.brokerId, prop)
         quotaManager.markThrottled(topic, partitions)
@@ -177,7 +177,7 @@ class UserConfigHandler(private val quotaManagers: 
QuotaManagers, val credential
     val sanitizedUser = entities(0)
     val sanitizedClientId = if (entities.length == 3) Some(entities(2)) else 
None
     updateQuotaConfig(Some(sanitizedUser), sanitizedClientId, config)
-    if (!sanitizedClientId.isDefined && sanitizedUser != 
ConfigEntityName.Default)
+    if (sanitizedClientId.isEmpty && sanitizedUser != ConfigEntityName.Default)
       
credentialProvider.updateCredentials(Sanitizer.desanitize(sanitizedUser), 
config)
   }
 }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 064974b7db..3bd4ca5fce 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -112,7 +112,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     } catch {
       case e: FatalExitError => throw e
       case e: Throwable => {
-        val t = if (e.isInstanceOf[ExecutionException]) e.getCause() else e
+        val t = if (e.isInstanceOf[ExecutionException]) e.getCause else e
         error(s"Unexpected error handling request ${request.requestDesc(true)} 
" +
           s"with context ${request.context}", t)
         requestHelper.handleError(request, t)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 049ad5b8c2..498402370b 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -150,7 +150,7 @@ class ControllerServer(
         socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
           config.controllerListeners.head.listenerName))
       } else {
-        throw new ConfigException("No controller.listener.names defined for 
controller");
+        throw new ConfigException("No controller.listener.names defined for 
controller")
       }
 
       val threadNamePrefixAsString = threadNamePrefix.getOrElse("")
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala 
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 536a296383..8b96e72543 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -121,7 +121,7 @@ object DelegationTokenManager {
         require(mainJs(VersionKey).to[Int] == CurrentVersion)
         val owner = 
SecurityUtils.parseKafkaPrincipal(Sanitizer.desanitize(mainJs(OwnerKey).to[String]))
         val renewerStr = mainJs(RenewersKey).to[Seq[String]]
-        val renewers = 
renewerStr.map(Sanitizer.desanitize(_)).map(SecurityUtils.parseKafkaPrincipal(_))
+        val renewers = 
renewerStr.map(Sanitizer.desanitize).map(SecurityUtils.parseKafkaPrincipal)
         val issueTimestamp = mainJs(IssueTimestampKey).to[Long]
         val expiryTimestamp = mainJs(ExpiryTimestampKey).to[Long]
         val maxTimestamp = mainJs(MaxTimestampKey).to[Long]
@@ -140,13 +140,13 @@ object DelegationTokenManager {
 
     val allow =
     //exclude tokens which are not requested
-      if (!owners.isEmpty && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {
+      if (owners.isDefined && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {
         false
         //Owners and the renewers can describe their own tokens
       } else if (token.ownerOrRenewer(requestedPrincipal)) {
         true
         // Check permission for non-owned tokens
-      } else if ((authorizeToken(token.tokenId))) {
+      } else if (authorizeToken(token.tokenId)) {
         true
       }
       else {
@@ -172,7 +172,7 @@ class DelegationTokenManager(val config: KafkaConfig,
 
   val secretKey = {
     val keyBytes =  if (config.tokenAuthEnabled) 
config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
-    if (keyBytes == null || keyBytes.length == 0) null
+    if (keyBytes == null || keyBytes.isEmpty) null
     else
       createSecretKey(keyBytes)
   }
@@ -183,7 +183,7 @@ class DelegationTokenManager(val config: KafkaConfig,
   private val lock = new Object()
   private var tokenChangeListener: ZkNodeChangeNotificationListener = null
 
-  def startup() = {
+  def startup(): Unit = {
     if (config.tokenAuthEnabled) {
       zkClient.createDelegationTokenPaths()
       loadCache()
@@ -192,7 +192,7 @@ class DelegationTokenManager(val config: KafkaConfig,
     }
   }
 
-  def shutdown() = {
+  def shutdown(): Unit = {
     if (config.tokenAuthEnabled) {
       if (tokenChangeListener != null) tokenChangeListener.close()
     }
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala 
b/core/src/main/scala/kafka/server/FetchSession.scala
index f7d348ddc5..b32cb8bcf6 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -790,7 +790,7 @@ class FetchManager(private val time: Time,
         new FullFetchContext(time, cache, reqMetadata, fetchData, reqVersion 
>= 13, isFollower)
       }
       debug(s"Created a new full FetchContext with 
${partitionsToLogString(fetchData.keySet)}."+
-        s"${removedFetchSessionStr}${suffix}")
+        s"$removedFetchSessionStr$suffix")
       context
     } else {
       cache.synchronized {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 86295816f1..5793a87dc7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1226,7 +1226,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     var unauthorizedForCreateTopics = Set[String]()
 
     if (authorizedTopics.nonEmpty) {
-      val nonExistingTopics = 
authorizedTopics.filterNot(metadataCache.contains(_))
+      val nonExistingTopics = 
authorizedTopics.filterNot(metadataCache.contains)
       if (metadataRequest.allowAutoTopicCreation && 
config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
         if (!authHelper.authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME, logIfDenied = false)) {
           val authorizedForCreateTopics = 
authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
@@ -1350,7 +1350,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val payloadOpt = 
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
                 payloadOpt match {
                   case Some(payload) =>
-                    (topicPartition, new 
OffsetFetchResponse.PartitionData(payload.toLong,
+                    (topicPartition, new 
OffsetFetchResponse.PartitionData(payload,
                       Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.NONE))
                   case None =>
                     (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
@@ -1615,7 +1615,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val listedGroup = new ListGroupsResponseData.ListedGroup()
                   .setGroupId(group.groupId)
                   .setProtocolType(group.protocolType)
-                  .setGroupState(group.state.toString)
+                  .setGroupState(group.state)
                 listedGroup
             }.asJava)
             .setThrottleTimeMs(throttleMs)
@@ -2156,16 +2156,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
         new DeleteRecordsResponse(new DeleteRecordsResponseData()
           .setThrottleTimeMs(requestThrottleMs)
-          .setTopics(new 
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection(mergedResponseStatus.groupBy(_._1.topic).map
 { case (topic, partitionMap) => {
+          .setTopics(new 
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection(mergedResponseStatus.groupBy(_._1.topic).map
 { case (topic, partitionMap) =>
             new DeleteRecordsTopicResult()
               .setName(topic)
-              .setPartitions(new 
DeleteRecordsResponseData.DeleteRecordsPartitionResultCollection(partitionMap.map
 { case (topicPartition, partitionResult) => {
+              .setPartitions(new 
DeleteRecordsResponseData.DeleteRecordsPartitionResultCollection(partitionMap.map
 { case (topicPartition, partitionResult) =>
                 new 
DeleteRecordsPartitionResult().setPartitionIndex(topicPartition.partition)
                   .setLowWatermark(partitionResult.lowWatermark)
                   .setErrorCode(partitionResult.errorCode)
-              }
               }.toList.asJava.iterator()))
-          }
           }.toList.asJava.iterator()))))
     }
 
@@ -3356,7 +3354,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         .setThrottleTimeMs(requestThrottleMs)
         .setClusterId(clusterId)
         .setControllerId(controllerId)
-        .setClusterAuthorizedOperations(clusterAuthorizedOperations);
+        .setClusterAuthorizedOperations(clusterAuthorizedOperations)
 
 
       brokers.foreach { broker =>
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 28cb1cbc6f..d006165577 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -212,7 +212,7 @@ class KafkaServer(
 
         /* Get or create cluster_id */
         _clusterId = getOrGenerateClusterId(zkClient)
-        info(s"Cluster ID = ${clusterId}")
+        info(s"Cluster ID = $clusterId")
 
         /* load metadata */
         val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =
@@ -227,7 +227,7 @@ class KafkaServer(
         /* check cluster id */
         if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && 
preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
           throw new InconsistentClusterIdException(
-            s"The Cluster ID ${clusterId} doesn't match stored clusterId 
${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
+            s"The Cluster ID $clusterId doesn't match stored clusterId 
${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
             s"The broker is trying to join the wrong cluster. Configured 
zookeeper.connect may be wrong.")
 
         /* generate brokerId */
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala 
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 749b6dd66f..f88a4cc907 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -44,8 +44,7 @@ object PartitionMetadataFile {
   }
 
   class PartitionMetadataReadBuffer[T](location: String,
-                                       reader: BufferedReader,
-                                       version: Int) extends Logging {
+                                       reader: BufferedReader) extends Logging 
{
     def read(): PartitionMetadata = {
       def malformedLineException(line: String) =
         new IOException(s"Malformed line in checkpoint file ($location): 
'$line'")
@@ -141,7 +140,7 @@ class PartitionMetadataFile(val file: File,
       try {
         val reader = Files.newBufferedReader(path)
         try {
-          val partitionBuffer = new 
PartitionMetadataReadBuffer(file.getAbsolutePath, reader, CurrentVersion)
+          val partitionBuffer = new 
PartitionMetadataReadBuffer(file.getAbsolutePath, reader)
           partitionBuffer.read()
         } finally {
           reader.close()
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index eff4824c47..831212f9eb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -711,7 +711,7 @@ class ReplicaManager(val config: KafkaConfig,
           /* If the topic name is exceptionally long, we can't support 
altering the log directory.
            * See KAFKA-4893 for details.
            * TODO: fix this by implementing topic IDs. */
-          if (UnifiedLog.logFutureDirName(topicPartition).size > 255)
+          if (UnifiedLog.logFutureDirName(topicPartition).length > 255)
             throw new InvalidTopicException("The topic name is too long.")
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is 
offline")
@@ -2082,28 +2082,27 @@ class ReplicaManager(val config: KafkaConfig,
                                           topicId: Uuid): Option[(Partition, 
Boolean)] = {
     getPartition(tp) match {
       case HostedPartition.Offline =>
-        stateChangeLogger.warn(s"Unable to bring up new local leader ${tp} " +
-          s"with topic id ${topicId} because it resides in an offline log " +
+        stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+          s"with topic id $topicId because it resides in an offline log " +
           "directory.")
         None
 
-      case HostedPartition.Online(partition) => {
+      case HostedPartition.Online(partition) =>
         if (partition.topicId.exists(_ != topicId)) {
           // Note: Partition#topicId will be None here if the Log object for 
this partition
           // has not been created.
-          throw new IllegalStateException(s"Topic ${tp} exists, but its ID is 
" +
-            s"${partition.topicId.get}, not ${topicId} as expected")
+          throw new IllegalStateException(s"Topic $tp exists, but its ID is " +
+            s"${partition.topicId.get}, not $topicId as expected")
         }
         Some(partition, false)
-      }
 
       case HostedPartition.None =>
         if (delta.image().topicsById().containsKey(topicId)) {
-          stateChangeLogger.error(s"Expected partition ${tp} with topic id " +
-            s"${topicId} to exist, but it was missing. Creating...")
+          stateChangeLogger.error(s"Expected partition $tp with topic id " +
+            s"$topicId to exist, but it was missing. Creating...")
         } else {
-          stateChangeLogger.info(s"Creating new partition ${tp} with topic id 
" +
-            s"${topicId}.")
+          stateChangeLogger.info(s"Creating new partition $tp with topic id " +
+            s"$topicId.")
         }
         // it's a partition that we don't know about yet, so create it and 
mark it online
         val partition = Partition(tp, time, this)
@@ -2130,10 +2129,10 @@ class ReplicaManager(val config: KafkaConfig,
         stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
         stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
           if (e.isInstanceOf[KafkaStorageException]) {
-            stateChangeLogger.error(s"Unable to delete replica 
${topicPartition} because " +
+            stateChangeLogger.error(s"Unable to delete replica $topicPartition 
because " +
               "the local replica for the partition is in an offline log 
directory")
           } else {
-            stateChangeLogger.error(s"Unable to delete replica 
${topicPartition} because " +
+            stateChangeLogger.error(s"Unable to delete replica $topicPartition 
because " +
               s"we got an unexpected ${e.getClass.getName} exception: 
${e.getMessage}")
           }
         }
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 3035cb1371..c02936973d 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -79,7 +79,7 @@ class ReplicationQuotaManager(val config: 
ReplicationQuotaManagerConfig,
   private var quota: Quota = null
   private val sensorAccess = new SensorAccess(lock, metrics)
   private val rateMetricName = metrics.metricName("byte-rate", 
replicationType.toString,
-    s"Tracking byte-rate for ${replicationType}")
+    s"Tracking byte-rate for $replicationType")
 
   /**
     * Update the quota
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 5b71409714..54a777f67f 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -41,7 +41,7 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter]
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
-  private val logContext = new LogContext(s"[BrokerMetadataListener 
id=${brokerId}] ")
+  private val logContext = new LogContext(s"[BrokerMetadataListener 
id=$brokerId] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
   logIdent = logContext.logPrefix()
 
@@ -110,7 +110,7 @@ class BrokerMetadataListener(
       val results = try {
         val loadResults = loadBatches(_delta, reader, None, None, None)
         if (isDebugEnabled) {
-          debug(s"Loaded new commits: ${loadResults}")
+          debug(s"Loaded new commits: $loadResults")
         }
         loadResults
       } finally {
@@ -154,7 +154,7 @@ class BrokerMetadataListener(
         )
         _delta.finishSnapshot()
         info(s"Loaded snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
-          s"${loadResults}")
+          s"$loadResults")
       } finally {
         reader.close()
       }
@@ -163,9 +163,9 @@ class BrokerMetadataListener(
   }
 
   case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs: 
Long, numBytes: Long) {
-    override def toString(): String = {
-      s"${numBatches} batch(es) with ${numRecords} record(s) in ${numBytes} 
bytes " +
-        s"ending at offset ${highestMetadataOffset} in ${elapsedUs} 
microseconds"
+    override def toString: String = {
+      s"$numBatches batch(es) with $numRecords record(s) in $numBytes bytes " +
+        s"ending at offset $highestMetadataOffset in $elapsedUs microseconds"
     }
   }
 
@@ -194,7 +194,7 @@ class BrokerMetadataListener(
     var numRecords = 0
     var numBytes = 0L
 
-    while (iterator.hasNext()) {
+    while (iterator.hasNext) {
       val batch = iterator.next()
 
       val epoch = lastCommittedEpoch.getOrElse(batch.epoch())
@@ -236,7 +236,7 @@ class BrokerMetadataListener(
 
     override def run(): Unit = {
       _publisher = Some(publisher)
-      log.info(s"Starting to publish metadata events at offset 
${highestMetadataOffset}.")
+      log.info(s"Starting to publish metadata events at offset 
$highestMetadataOffset.")
       try {
         publish(publisher)
         future.complete(null)
@@ -271,7 +271,7 @@ class BrokerMetadataListener(
     _image = _delta.apply()
     _delta = new MetadataDelta(_image)
     if (isDebugEnabled) {
-      debug(s"Publishing new metadata delta ${delta} at offset 
${_image.highestOffsetAndEpoch().offset}.")
+      debug(s"Publishing new metadata delta $delta at offset 
${_image.highestOffsetAndEpoch().offset}.")
     }
     publisher.publish(delta, _image)
   }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 291a1507d2..e653e6e5b2 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -111,7 +111,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   /**
    * The broker ID.
    */
-  val brokerId = conf.nodeId
+  val brokerId: Int = conf.nodeId
 
   /**
    * True if this is the first time we have published metadata.
@@ -199,7 +199,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                   processConfigChanges(ConfigEntityName.Default, props)
               } else if (resource.name() == brokerId.toString) {
                 // Apply changes to this broker's dynamic configuration.
-                info(s"Updating broker ${brokerId} with new configuration : " +
+                info(s"Updating broker $brokerId with new configuration : " +
                   toLoggableProps(resource, props).mkString(","))
                 dynamicConfigHandlers(ConfigType.Broker).
                   processConfigChanges(resource.name(), props)
@@ -227,7 +227,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       // there could be a window during which incorrect authorization results 
are returned.
       Option(delta.aclsDelta()).foreach( aclsDelta =>
         _authorizer match {
-          case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta()) {
+          case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {
             // If the delta resulted from a snapshot load, we want to apply 
the new changes
             // all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
             // first snapshot load, it will also complete the futures returned 
by
@@ -237,10 +237,10 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
             // Because the changes map is a LinkedHashMap, the deltas will be 
returned in
             // the order they were performed.
             aclsDelta.changes().entrySet().forEach(e =>
-              if (e.getValue().isPresent()) {
-                authorizer.addAcl(e.getKey(), e.getValue().get())
+              if (e.getValue.isPresent) {
+                authorizer.addAcl(e.getKey, e.getValue.get())
               } else {
-                authorizer.removeAcl(e.getKey())
+                authorizer.removeAcl(e.getKey)
               })
           }
           case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index fb5bfbbd81..f32c4d3238 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -38,7 +38,7 @@ class BrokerMetadataSnapshotter(
   threadNamePrefix: Option[String],
   writerBuilder: SnapshotWriterBuilder
 ) extends Logging with MetadataSnapshotter {
-  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter 
id=${brokerId}] ")
+  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter 
id=$brokerId] ")
   logIdent = logContext.logPrefix()
 
   /**
diff --git 
a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala 
b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
index 6ada6b258c..3f4b136fb9 100644
--- a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
+++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
@@ -98,10 +98,10 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
         }
       }
       quotaDelta.changes().entrySet().forEach { e =>
-        handleUserClientQuotaChange(userClientEntity, e.getKey(), 
e.getValue().asScala.map(_.toDouble))
+        handleUserClientQuotaChange(userClientEntity, e.getKey, 
e.getValue.asScala.map(_.toDouble))
       }
     } else {
-      warn(s"Ignoring unsupported quota entity ${entity}.")
+      warn(s"Ignoring unsupported quota entity $entity.")
     }
   }
 
@@ -119,10 +119,10 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
 
     quotaDelta.changes().entrySet().forEach { e =>
       // The connection quota only understands the connection rate limit
-      val quotaName = e.getKey()
-      val quotaValue = e.getValue()
+      val quotaName = e.getKey
+      val quotaValue = e.getValue
       if (!quotaName.equals(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)) {
-        warn(s"Ignoring unexpected quota key ${quotaName} for entity 
$ipEntity")
+        warn(s"Ignoring unexpected quota key $quotaName for entity $ipEntity")
       } else {
         try {
           connectionQuotas.updateIpConnectionRateQuota(inetAddress, 
quotaValue.asScala.map(_.toInt))
@@ -140,7 +140,7 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
       case QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG => 
quotaManagers.request
       case QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG => 
quotaManagers.controllerMutation
       case _ =>
-        warn(s"Ignoring unexpected quota key ${key} for entity $quotaEntity")
+        warn(s"Ignoring unexpected quota key $key for entity $quotaEntity")
         return
     }
 
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 1ff7a8076c..880950b3a8 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -97,10 +97,10 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
         maybeLeader match {
           case None =>
             val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
-              debug(s"Error while fetching metadata for 
${topicName}-${partitionId}: leader not available")
+              debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
               Errors.LEADER_NOT_AVAILABLE
             } else {
-              debug(s"Error while fetching metadata for 
${topicName}-${partitionId}: listener $listenerName " +
+              debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
                 s"not found on leader ${partition.leader}")
               if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
             }
@@ -113,12 +113,12 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
               .setIsrNodes(filteredIsr)
               .setOfflineReplicas(offlineReplicas)
           case Some(leader) =>
-            val error = if (filteredReplicas.size < partition.replicas.size) {
-              debug(s"Error while fetching metadata for 
${topicName}-${partitionId}: replica information not available for " +
+            val error = if (filteredReplicas.size < partition.replicas.length) 
{
+              debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
                 s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
               Errors.REPLICA_NOT_AVAILABLE
-            } else if (filteredIsr.size < partition.isr.size) {
-              debug(s"Error while fetching metadata for 
${topicName}-${partitionId}: in sync replica information not available for " +
+            } else if (filteredIsr.size < partition.isr.length) {
+              debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
                 s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
               Errors.REPLICA_NOT_AVAILABLE
             } else {
@@ -266,8 +266,8 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     val image = _currentImage
     val result = new mutable.HashMap[Int, Node]()
     Option(image.topics().getTopic(tp.topic())).foreach { topic =>
-      topic.partitions().values().forEach { case partition =>
-        partition.replicas.map { case replicaId =>
+      topic.partitions().values().forEach { partition =>
+        partition.replicas.map { replicaId =>
           result.put(replicaId, Option(image.cluster().broker(replicaId)) 
match {
             case None => Node.noNode()
             case Some(broker) => 
broker.node(listenerName.value()).asScala.getOrElse(Node.noNode())
@@ -288,7 +288,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
    */
   private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
     val aliveBrokers = getAliveBrokers(image).toList
-    if (aliveBrokers.size == 0) {
+    if (aliveBrokers.isEmpty) {
       None
     } else {
       
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
@@ -315,8 +315,8 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
     image.topics().topicsByName().values().forEach { topic =>
       topic.partitions().entrySet().forEach { entry =>
-        val partitionId = entry.getKey()
-        val partition = entry.getValue()
+        val partitionId = entry.getKey
+        val partition = entry.getValue
         partitionInfos.add(new PartitionInfo(topic.name(),
           partitionId,
           node(partition.leader),
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala 
b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 8f8dfcd1a0..16842bcd11 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -39,7 +39,7 @@ class ZkConfigRepository(adminZkClient: AdminZkClient) 
extends ConfigRepository
     }
     // ZK stores cluster configs under "<default>".
     val effectiveName = if (configResource.`type`.equals(Type.BROKER) &&
-        configResource.name.isEmpty()) {
+        configResource.name.isEmpty) {
       ConfigEntityName.Default
     } else {
       configResource.name
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala 
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 80726ce06b..1eaee48416 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -52,7 +52,7 @@ object CommandLineUtils extends Logging {
     * @param commandOpts Acceptable options for a command
     * @param message     Message to display on successful check
     */
-  def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message: 
String) = {
+  def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message: 
String): Unit = {
     if (isPrintHelpNeeded(commandOpts))
       printUsageAndDie(commandOpts.parser, message)
     if (isPrintVersionNeeded(commandOpts))
@@ -117,7 +117,7 @@ object CommandLineUtils extends Logging {
 
     val props = new Properties
     for (a <- splits) {
-      if (a.length == 1 || (a.length == 2 && a(1).isEmpty())) {
+      if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) {
         if (acceptMissingValue) props.put(a(0), "")
         else throw new IllegalArgumentException(s"Missing value for key 
${a(0)}")
       }
diff --git a/core/src/main/scala/kafka/utils/Exit.scala 
b/core/src/main/scala/kafka/utils/Exit.scala
index ad17237571..eddd929af5 100644
--- a/core/src/main/scala/kafka/utils/Exit.scala
+++ b/core/src/main/scala/kafka/utils/Exit.scala
@@ -45,7 +45,7 @@ object Exit {
     JExit.setHaltProcedure(functionToProcedure(haltProcedure))
 
   def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit 
= {
-    JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, 
runnable.run))
+    JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, 
runnable.run()))
   }
 
   def resetExitProcedure(): Unit =
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala 
b/core/src/main/scala/kafka/utils/FileLock.scala
index c635f76dff..2de16386c9 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -73,7 +73,7 @@ class FileLock(val file: File) extends Logging {
   /**
    * Destroy this lock, closing the associated FileChannel
    */
-  def destroy() = {
+  def destroy(): Unit = {
     this synchronized {
       unlock()
       channel.close()
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala 
b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index bec511b3f7..354652ee6f 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -135,7 +135,7 @@ class KafkaScheduler(val threads: Int,
    * Package private for testing.
    */
   private[kafka] def taskRunning(task: ScheduledFuture[_]): Boolean = {
-    executor.getQueue().contains(task)
+    executor.getQueue.contains(task)
   }
 
   def resizeThreadPool(newSize: Int): Unit = {
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala 
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 0f3de767fd..056545cb03 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 
 object ToolsUtils {
 
-  def validatePortOrDie(parser: OptionParser, hostPort: String) = {
+  def validatePortOrDie(parser: OptionParser, hostPort: String): Unit = {
     val hostPorts: Array[String] = if(hostPort.contains(','))
       hostPort.split(",")
     else
diff --git a/core/src/main/scala/kafka/utils/VersionInfo.scala 
b/core/src/main/scala/kafka/utils/VersionInfo.scala
index 9d3130e668..203488a64b 100644
--- a/core/src/main/scala/kafka/utils/VersionInfo.scala
+++ b/core/src/main/scala/kafka/utils/VersionInfo.scala
@@ -35,6 +35,6 @@ object VersionInfo {
   }
 
   def getVersionString: String = {
-    s"${getVersion} (Commit:${getCommit})"
+    s"$getVersion (Commit:$getCommit)"
   }
 }

Reply via email to