This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d04c7a0459 MINOR: Various Scala cleanups in core (#14558)
9d04c7a0459 is described below

commit 9d04c7a0459ff4e13d9385427eb244a47a1071da
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Oct 17 12:04:14 2023 +0200

    MINOR: Various Scala cleanups in core (#14558)
    
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../scala/kafka/common/UnknownCodecException.scala | 26 ------------------
 .../scala/kafka/server/AbstractFetcherThread.scala | 11 ++++----
 .../kafka/server/BrokerLifecycleManager.scala      |  4 +--
 .../kafka/server/ClientRequestQuotaManager.scala   |  5 ++--
 .../server/ControllerConfigurationValidator.scala  | 10 +++----
 .../server/ControllerMutationQuotaManager.scala    |  1 -
 .../server/ControllerRegistrationManager.scala     |  8 +++---
 .../src/main/scala/kafka/server/DelayedFetch.scala |  6 ++--
 .../main/scala/kafka/server/DelayedOperation.scala |  2 +-
 .../main/scala/kafka/server/DelayedProduce.scala   |  4 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala | 32 +++++++++++-----------
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  4 +--
 .../scala/kafka/server/PartitionMetadataFile.scala |  2 +-
 .../scala/kafka/server/metadata/AclPublisher.scala |  2 +-
 .../server/metadata/BrokerServerMetrics.scala      |  2 +-
 .../server/metadata/DelegationTokenPublisher.scala |  6 ++--
 .../metadata/DynamicClientQuotaPublisher.scala     |  4 +--
 .../server/metadata/DynamicConfigPublisher.scala   | 10 +++----
 .../kafka/server/metadata/ScramPublisher.scala     |  4 +--
 core/src/main/scala/kafka/utils/CoreUtils.scala    |  6 ++--
 core/src/main/scala/kafka/utils/Mx4jLoader.scala   |  6 ++--
 .../main/scala/kafka/utils/PasswordEncoder.scala   |  2 +-
 core/src/main/scala/kafka/utils/Throttler.scala    |  4 +--
 core/src/main/scala/kafka/utils/ToolsUtils.scala   |  2 +-
 .../scala/kafka/utils/VerifiableProperties.scala   |  2 +-
 25 files changed, 70 insertions(+), 95 deletions(-)

diff --git a/core/src/main/scala/kafka/common/UnknownCodecException.scala 
b/core/src/main/scala/kafka/common/UnknownCodecException.scala
deleted file mode 100644
index 7e669019c32..00000000000
--- a/core/src/main/scala/kafka/common/UnknownCodecException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Indicates the client has requested a range no longer available on the server
- */
-class UnknownCodecException(message: String) extends RuntimeException(message) 
{
-  def this() = this(null)
-}
-
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 450fcfea461..935599048da 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import com.yammer.metrics.core.Meter
 import kafka.common.ClientIdAndBroker
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
 import kafka.utils.CoreUtils.inLock
@@ -759,7 +760,7 @@ abstract class AbstractFetcherThread(name: String,
                                                 leaderEpochInRequest: 
Optional[Integer],
                                                 fetchPartitionData: 
PartitionData): Boolean = {
     try {
-      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState, fetchPartitionData);
+      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState, fetchPartitionData)
 
       // TODO: use fetchTierStateMachine.maybeAdvanceState when implementing 
async tiering logic in KAFKA-13560
 
@@ -879,7 +880,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
     lagVal.set(newLag)
   }
 
-  def lag = lagVal.get
+  def lag: Long = lagVal.get
 
   def unregister(): Unit = {
     metricsGroup.removeMetric(FetcherMetrics.ConsumerLag, tags)
@@ -909,13 +910,13 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
 class FetcherStats(metricId: ClientIdAndBroker) {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
-  val tags = Map("clientId" -> metricId.clientId,
+  val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
     "brokerHost" -> metricId.brokerHost,
     "brokerPort" -> metricId.brokerPort.toString).asJava
 
-  val requestRate = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, 
"requests", TimeUnit.SECONDS, tags)
+  val requestRate: Meter = 
metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
 
-  val byteRate = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", 
TimeUnit.SECONDS, tags)
+  val byteRate: Meter = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, 
"bytes", TimeUnit.SECONDS, tags)
 
   def unregister(): Unit = {
     metricsGroup.removeMetric(FetcherMetrics.RequestsPerSec, tags)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index c17fe7c0c38..10074d4a5f5 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -252,7 +252,7 @@ class BrokerLifecycleManager(
    * Start shutting down the BrokerLifecycleManager, but do not block.
    */
   def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown");
+    eventQueue.beginShutdown("beginShutdown")
   }
 
   /**
@@ -483,7 +483,7 @@ class BrokerLifecycleManager(
     override def run(): Unit = {
       if (!initialRegistrationSucceeded) {
         error("Shutting down because we were unable to register with the 
controller quorum.")
-        eventQueue.beginShutdown("registrationTimeout");
+        eventQueue.beginShutdown("registrationTimeout")
       }
     }
   }
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 6e57d97bc3e..d330210b9d5 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -28,11 +28,10 @@ import org.apache.kafka.server.quota.ClientQuotaCallback
 import scala.jdk.CollectionConverters._
 
 object ClientRequestQuotaManager {
-  val QuotaRequestPercentDefault = Int.MaxValue.toDouble
-  val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
+  val NanosToPercentagePerSecond: Double = 100.0 / TimeUnit.SECONDS.toNanos(1)
   // Since exemptSensor is for all clients and has a constant name, we do not 
expire exemptSensor and only
   // create once.
-  val DefaultInactiveExemptSensorExpirationTimeSeconds = Long.MaxValue
+  val DefaultInactiveExemptSensorExpirationTimeSeconds: Long = Long.MaxValue
 
   private val ExemptSensorName = "exempt-" + QuotaType.Request
 }
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index ccdd0ac31af..e095ecdc051 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -46,7 +46,7 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
   private def validateTopicName(
     name: String
   ): Unit = {
-    if (name.isEmpty()) {
+    if (name.isEmpty) {
       throw new InvalidRequestException("Default topic resources are not 
allowed.")
     }
     Topic.validate(name)
@@ -55,7 +55,7 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
   private def validateBrokerName(
     name: String
   ): Unit = {
-    if (!name.isEmpty()) {
+    if (name.nonEmpty) {
       val brokerId = try {
         Integer.valueOf(name)
       } catch {
@@ -96,10 +96,10 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
         val properties = new Properties()
         val nullTopicConfigs = new mutable.ArrayBuffer[String]()
         config.entrySet().forEach(e => {
-          if (e.getValue() == null) {
-            nullTopicConfigs += e.getKey()
+          if (e.getValue == null) {
+            nullTopicConfigs += e.getKey
           } else {
-            properties.setProperty(e.getKey(), e.getValue())
+            properties.setProperty(e.getKey, e.getValue)
           }
         })
         if (nullTopicConfigs.nonEmpty) {
diff --git 
a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index f011a6b3663..e21c4699bcd 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -132,7 +132,6 @@ class PermissiveControllerMutationQuota(private val time: 
Time,
 }
 
 object ControllerMutationQuotaManager {
-  val QuotaControllerMutationDefault = Int.MaxValue.toDouble
 
   /**
    * This calculates the amount of time needed to bring the TokenBucket within 
quota
diff --git 
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala 
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index c5868c05732..ae717bce624 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -143,7 +143,7 @@ class ControllerRegistrationManager(
    * Start shutting down the ControllerRegistrationManager, but do not block.
    */
   def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown");
+    eventQueue.beginShutdown("beginShutdown")
   }
 
   /**
@@ -206,7 +206,7 @@ class ControllerRegistrationManager(
       info("maybeSendControllerRegistration: cannot register yet because the 
metadata version is " +
           s"still $metadataVersion, which does not support KIP-919 controller 
registration.")
     } else if (pendingRpc) {
-      info("maybeSendControllerRegistration: waiting for the previous RPC to 
complete.");
+      info("maybeSendControllerRegistration: waiting for the previous RPC to 
complete.")
     } else {
       sendControllerRegistration()
     }
@@ -224,7 +224,7 @@ class ControllerRegistrationManager(
       setControllerId(nodeId).
       setFeatures(features).
       setIncarnationId(incarnationId).
-      setListeners(listenerInfo.toControllerRegistrationRequest())
+      setListeners(listenerInfo.toControllerRegistrationRequest)
     info(s"sendControllerRegistration: attempting to send $data")
     _channelManager.sendRequest(new 
ControllerRegistrationRequest.Builder(data),
       new RegistrationResponseHandler())
@@ -274,7 +274,7 @@ class ControllerRegistrationManager(
   }
 
   private def scheduleNextCommunication(intervalMs: Long): Unit = {
-    trace(s"Scheduling next communication at ${intervalMs} ms from now.")
+    trace(s"Scheduling next communication at $intervalMs ms from now.")
     val deadlineNs = time.nanoseconds() + MILLISECONDS.toNanos(intervalMs)
     eventQueue.scheduleDeferred("communication",
       new DeadlineFunction(deadlineNs),
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 9ce6082e76c..f8b60b6071d 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import com.yammer.metrics.core.Meter
+
 import java.util.concurrent.TimeUnit
 import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
@@ -184,7 +186,7 @@ class DelayedFetch(
 object DelayedFetchMetrics {
   private val metricsGroup = new 
KafkaMetricsGroup(DelayedFetchMetrics.getClass)
   private val FetcherTypeKey = "fetcherType"
-  val followerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", 
"requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
-  val consumerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", 
"requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
+  val followerExpiredRequestMeter: Meter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, 
Map(FetcherTypeKey -> "follower").asJava)
+  val consumerExpiredRequestMeter: Meter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, 
Map(FetcherTypeKey -> "consumer").asJava)
 }
 
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 48a17442e0e..629199148a8 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -167,7 +167,7 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
      * Return all the current watcher lists,
      * note that the returned watchers may be removed from the list by other 
threads
      */
-    def allWatchers = {
+    def allWatchers: Iterable[Watchers] = {
       watchersByKey.values
     }
   }
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 2f1261ada23..7a21a86260c 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._
 case class ProducePartitionStatus(requiredOffset: Long, responseStatus: 
PartitionResponse) {
   @volatile var acksPending = false
 
-  override def toString = s"[acksPending: $acksPending, error: 
${responseStatus.error.code}, " +
+  override def toString: String = s"[acksPending: $acksPending, error: 
${responseStatus.error.code}, " +
     s"startOffset: ${responseStatus.baseOffset}, requiredOffset: 
$requiredOffset]"
 }
 
@@ -62,7 +62,7 @@ class DelayedProduce(delayMs: Long,
                      lockOpt: Option[Lock] = None)
   extends DelayedOperation(delayMs, lockOpt) {
 
-  override lazy val logger = DelayedProduce.logger
+  override lazy val logger: Logger = DelayedProduce.logger
 
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 618faeee790..485d76f7661 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -83,7 +83,7 @@ object Defaults {
   val BrokerHeartbeatIntervalMs = 2000
   val BrokerSessionTimeoutMs = 9000
   val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
-  val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1);
+  val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1)
   val MetadataMaxIdleIntervalMs = 500
   val MetadataMaxRetentionBytes = 100 * 1024 * 1024
   val DeleteTopicEnable = true
@@ -224,7 +224,7 @@ object Defaults {
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 30000
   val MetricReporterClasses = ""
-  val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
+  val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString
   val AutoIncludeJmxReporter = true
 
 
@@ -241,7 +241,7 @@ object Defaults {
   val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
   val SslEndpointIdentificationAlgorithm = 
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
   val SslClientAuthentication = 
SslClientAuth.NONE.name().toLowerCase(Locale.ROOT)
-  val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v 
=> v.toString().toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0))
+  val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v 
=> v.toString.toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0))
   val SslPrincipalMappingRules = 
BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
 
     /** ********* General Security configuration ***********/
@@ -754,7 +754,7 @@ object KafkaConfig {
     "maximum bytes limit is reached."
   val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often 
the active " +
     "controller should write no-op records to the metadata partition. If the 
value is 0, no-op records " +
-    s"are not appended to the metadata partition. The default value is 
${Defaults.MetadataMaxIdleIntervalMs}";
+    s"are not appended to the metadata partition. The default value is 
${Defaults.MetadataMaxIdleIntervalMs}"
   val ControllerListenerNamesDoc = "A comma-separated list of the names of the 
listeners used by the controller. This is required " +
     "if running in KRaft mode. When communicating with the controller quorum, 
the broker will always use the first listener in this list.\n " +
     "Note: The ZooKeeper-based controller should not set this configuration."
@@ -886,7 +886,7 @@ object KafkaConfig {
   val LogCleanerEnableDoc = "Enable the log cleaner process to run on the 
server. Should be enabled if using any topics with a cleanup.policy=compact 
including the internal offsets topic. If disabled those topics will not be 
compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone 
message markers for log compacted topics. This setting also gives a bound " +
     "on the time in which a consumer must complete a read if they begin from 
offset 0 to ensure that they get a valid snapshot of the final stage (otherwise 
 " +
-    "tombstones messages may be collected before a consumer completes their 
scan).";
+    "tombstones messages may be collected before a consumer completes their 
scan)."
   val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will 
remain uncompacted in the log. Only applicable for logs that are being 
compacted."
   val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will 
remain ineligible for compaction in the log. Only applicable for logs that are 
being compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
@@ -915,13 +915,13 @@ object KafkaConfig {
     "broker's timestamp and the message timestamp. The message timestamp can 
be earlier than or equal to the broker's " +
     "timestamp, with the maximum allowable difference determined by the value 
set in this configuration. " +
     "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
-    "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime.";
+    "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
   val LogMessageTimestampAfterMaxMsDoc = "This configuration sets the 
allowable timestamp difference between the " +
     "message timestamp and the broker's timestamp. The message timestamp can 
be later than or equal to the broker's " +
     "timestamp, with the maximum allowable difference determined by the value 
set in this configuration. " +
     "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
-    "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime.";
+    "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data 
directory to be used for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the 
server."
@@ -939,7 +939,7 @@ object KafkaConfig {
     "implement the 
<code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
   val AlterConfigPolicyClassNameDoc = "The alter configs policy class that 
should be used for validation. The class should " +
     "implement the 
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
-  val LogMessageDownConversionEnableDoc = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
+  val LogMessageDownConversionEnableDoc = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsDoc = "The socket timeout for 
controller-to-broker channels."
@@ -1949,7 +1949,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       // A user-supplied IBP was given
       val configuredVersion = 
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
       if (!configuredVersion.isKRaftSupported) {
-        throw new ConfigException(s"A non-KRaft version 
${interBrokerProtocolVersionString} given for 
${KafkaConfig.InterBrokerProtocolVersionProp}. " +
+        throw new ConfigException(s"A non-KRaft version 
$interBrokerProtocolVersionString given for 
${KafkaConfig.InterBrokerProtocolVersionProp}. " +
           s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
       } else {
         warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in 
KRaft mode as of 3.3 and will only " +
@@ -1968,7 +1968,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val controlledShutdownEnable = 
getBoolean(KafkaConfig.ControlledShutdownEnableProp)
 
   /** ********* Feature configuration ***********/
-  def isFeatureVersioningSupported = 
interBrokerProtocolVersion.isFeatureVersioningSupported()
+  def isFeatureVersioningSupported = 
interBrokerProtocolVersion.isFeatureVersioningSupported
 
   /** ********* Group coordinator configuration ***********/
   val groupMinSessionTimeoutMs = 
getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
@@ -2038,12 +2038,12 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def controlPlaneListenerName = 
getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => 
listenerName }
   def controlPlaneSecurityProtocol = 
getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) 
=> securityProtocol }
   def saslMechanismInterBrokerProtocol = 
getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
-  val saslInterBrokerHandshakeRequestEnable = 
interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled()
+  val saslInterBrokerHandshakeRequestEnable = 
interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled
 
   /** ********* DelegationToken Configuration **************/
   val delegationTokenSecretKey = 
Option(getPassword(KafkaConfig.DelegationTokenSecretKeyProp))
     .getOrElse(getPassword(KafkaConfig.DelegationTokenSecretKeyAliasProp))
-  val tokenAuthEnabled = (delegationTokenSecretKey != null && 
!delegationTokenSecretKey.value.isEmpty)
+  val tokenAuthEnabled = delegationTokenSecretKey != null && 
delegationTokenSecretKey.value.nonEmpty
   val delegationTokenMaxLifeMs = 
getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp)
   val delegationTokenExpiryTimeMs = 
getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
   val delegationTokenExpiryCheckIntervalMs = 
getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
@@ -2222,7 +2222,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 
   // Topic IDs are used with all self-managed quorum clusters and ZK cluster 
with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =
-    usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported()
+    usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported
 
 
   val isRemoteLogStorageSystemEnabled: lang.Boolean = 
getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP)
@@ -2310,7 +2310,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
       require(advertisedListenerNames.nonEmpty,
         "There must be at least one advertised listener." + (
-          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in $ControllerListenerNamesProp?" else ""))
     }
     if (processRoles == Set(BrokerRole)) {
       // KRaft broker-only
@@ -2434,11 +2434,11 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to 
${KafkaConfig.SocketRequestMaxBytesProp}")
 
     if (maxConnectionsPerIp == 0)
-      require(!maxConnectionsPerIpOverrides.isEmpty, 
s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
+      require(maxConnectionsPerIpOverrides.nonEmpty, 
s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
         s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.")
 
     val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address 
=> Utils.validHostPattern(address))
-    if (!invalidAddresses.isEmpty)
+    if (invalidAddresses.nonEmpty)
       throw new 
IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} 
contains invalid addresses : ${invalidAddresses.mkString(",")}")
 
     if (connectionsMaxIdleMs >= 0)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 92d4274bcb6..c28cfc6f543 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -120,10 +120,10 @@ object KafkaRaftServer {
 
   sealed trait ProcessRole
   case object BrokerRole extends ProcessRole {
-    override def toString(): String = "broker"
+    override def toString: String = "broker"
   }
   case object ControllerRole extends ProcessRole {
-    override def toString(): String = "controller"
+    override def toString: String = "controller"
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala 
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 97af1688482..ec4425de9e6 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -117,7 +117,7 @@ class PartitionMetadataFile(val file: File,
             try {
               writer.write(PartitionMetadataFileFormatter.toFile(new 
PartitionMetadata(CurrentVersion, topicId)))
               writer.flush()
-              fileOutputStream.getFD().sync()
+              fileOutputStream.getFD.sync()
             } finally {
               writer.close()
             }
diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
index 819fcc3d38d..c33bec98a67 100644
--- a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
@@ -35,7 +35,7 @@ class AclPublisher(
 ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
   logIdent = s"[${name()}] "
 
-  override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}"
+  override def name(): String = s"AclPublisher $nodeType id=$nodeId"
 
   var completedInitialLoad = false
 
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index ff183324166..3e4f798abb4 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -136,7 +136,7 @@ final class BrokerServerMetrics private (
 }
 
 
-final object BrokerServerMetrics {
+object BrokerServerMetrics {
   private val metricGroupName = "broker-metadata-metrics"
 
   private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long => 
T): Unit = {
diff --git 
a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
index 59f91970f6b..34e14442b4d 100644
--- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
@@ -35,7 +35,7 @@ class DelegationTokenPublisher(
 
   var _firstPublish = true
 
-  override def name(): String = s"DelegationTokenPublisher ${nodeType} 
id=${conf.nodeId}"
+  override def name(): String = s"DelegationTokenPublisher $nodeType 
id=${conf.nodeId}"
 
   override def onMetadataUpdate(
     delta: MetadataDelta,
@@ -58,7 +58,7 @@ class DelegationTokenPublisher(
       if (_firstPublish) {
         // Initialize the tokenCache with the Image
         Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
-          delegationTokenImage.tokens().forEach { (tokenId, 
delegationTokenData) =>
+          delegationTokenImage.tokens().forEach { (_, delegationTokenData) =>
             
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
           }
         }
@@ -77,7 +77,7 @@ class DelegationTokenPublisher(
       }
     } catch {
       case t: Throwable => faultHandler.handleFault("Uncaught exception while 
" +
-        s"publishing DelegationToken changes from ${deltaName}", t)
+        s"publishing DelegationToken changes from $deltaName", t)
     }
   }
 }
diff --git 
a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
index 0ac93a46db1..94aaebf00a6 100644
--- 
a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
+++ 
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
@@ -32,7 +32,7 @@ class DynamicClientQuotaPublisher(
 ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
   logIdent = s"[${name()}] "
 
-  override def name(): String = s"DynamicClientQuotaPublisher ${nodeType} 
id=${conf.nodeId}"
+  override def name(): String = s"DynamicClientQuotaPublisher $nodeType 
id=${conf.nodeId}"
 
   override def onMetadataUpdate(
     delta: MetadataDelta,
@@ -53,7 +53,7 @@ class DynamicClientQuotaPublisher(
         }
     } catch {
       case t: Throwable => faultHandler.handleFault("Uncaught exception while 
" +
-        s"publishing dynamic client quota changes from ${deltaName}", t)
+        s"publishing dynamic client quota changes from $deltaName", t)
     }
   }
 }
diff --git 
a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
index b5db4e246b6..8f16d6c0513 100644
--- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -35,7 +35,7 @@ class DynamicConfigPublisher(
 ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
   logIdent = s"[${name()}] "
 
-  override def name(): String = s"DynamicConfigPublisher ${nodeType} 
id=${conf.nodeId}"
+  override def name(): String = s"DynamicConfigPublisher $nodeType 
id=${conf.nodeId}"
 
   override def onMetadataUpdate(
     delta: MetadataDelta,
@@ -66,7 +66,7 @@ class DynamicConfigPublisher(
                 } catch {
                   case t: Throwable => faultHandler.handleFault("Error 
updating topic " +
                     s"${resource.name()} with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
-                    s"in ${deltaName}", t)
+                    s"in $deltaName", t)
                 }
               )
             case BROKER =>
@@ -81,7 +81,7 @@ class DynamicConfigPublisher(
                   } catch {
                     case t: Throwable => faultHandler.handleFault("Error 
updating " +
                       s"cluster with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
-                      s"in ${deltaName}", t)
+                      s"in $deltaName", t)
                   }
                 } else if (resource.name() == conf.nodeId.toString) {
                   try {
@@ -97,7 +97,7 @@ class DynamicConfigPublisher(
                   } catch {
                     case t: Throwable => faultHandler.handleFault("Error 
updating " +
                       s"node with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
-                      s"in ${deltaName}", t)
+                      s"in $deltaName", t)
                   }
                 }
               )
@@ -107,7 +107,7 @@ class DynamicConfigPublisher(
       }
     } catch {
       case t: Throwable => faultHandler.handleFault("Uncaught exception while 
" +
-        s"publishing dynamic configuration changes from ${deltaName}", t)
+        s"publishing dynamic configuration changes from $deltaName", t)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
index 535ca6e8b57..bacac660e7b 100644
--- a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
@@ -33,7 +33,7 @@ class ScramPublisher(
 ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
   logIdent = s"[${name()}] "
 
-  override def name(): String = s"ScramPublisher ${nodeType} id=${conf.nodeId}"
+  override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}"
 
   override def onMetadataUpdate(
     delta: MetadataDelta,
@@ -65,7 +65,7 @@ class ScramPublisher(
       }
     } catch {
       case t: Throwable => faultHandler.handleFault("Uncaught exception while 
" +
-        s"publishing SCRAM changes from ${deltaName}", t)
+        s"publishing SCRAM changes from $deltaName", t)
     }
   }
 }
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 9307a5d3996..88e9f8aa2a8 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -94,7 +94,7 @@ object CoreUtils {
    */
   def registerMBean(mbean: Object, name: String): Boolean = {
     try {
-      val mbs = ManagementFactory.getPlatformMBeanServer()
+      val mbs = ManagementFactory.getPlatformMBeanServer
       mbs synchronized {
         val objName = new ObjectName(name)
         if (mbs.isRegistered(objName))
@@ -141,7 +141,7 @@ object CoreUtils {
    * Create an instance of the class with the given class name
    */
   def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
-    val klass = Class.forName(className, true, 
Utils.getContextOrKafkaClassLoader()).asInstanceOf[Class[T]]
+    val klass = Class.forName(className, true, 
Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]]
     val constructor = klass.getConstructor(args.map(_.getClass): _*)
     constructor.newInstance(args: _*)
   }
@@ -173,7 +173,7 @@ object CoreUtils {
   }
 
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
-    listenerListToEndPoints(listeners, securityProtocolMap, true)
+    listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
   }
 
   def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala 
b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index e49f3a57f1e..5fbbebed475 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -33,15 +33,15 @@ import javax.management.ObjectName
 object Mx4jLoader extends Logging {
 
   def maybeLoad(): Boolean = {
-    val props = new VerifiableProperties(System.getProperties())
-    if (!props.getBoolean("kafka_mx4jenable", false))
+    val props = new VerifiableProperties(System.getProperties)
+    if (!props.getBoolean("kafka_mx4jenable", default = false))
       return false
     val address = props.getString("mx4jaddress", "0.0.0.0")
     val port = props.getInt("mx4jport", 8082)
     try {
       debug("Will try to load MX4j now, if it's in the classpath")
 
-      val mbs = ManagementFactory.getPlatformMBeanServer()
+      val mbs = ManagementFactory.getPlatformMBeanServer
       val processorName = new ObjectName("Server:name=XSLTProcessor")
 
       val httpAdaptorClass = 
Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala 
b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index 1d89e3fe021..d4737be08ce 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -78,7 +78,7 @@ class NoOpPasswordEncoder extends PasswordEncoder {
   * @param keyLength Key length used for encoding. This should be valid for 
the specified algorithms.
   * @param iterations Iteration count used for encoding.
   *
-  * The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and 
`iterations` are used for encoding passwords.
+  * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and 
`iterations` are used for encoding passwords.
   * The values used for encoding are stored along with the encoded password 
and the stored values are used for decoding.
   *
   */
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala 
b/core/src/main/scala/kafka/utils/Throttler.scala
index 824c7dcfc28..1e77624a9fb 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -54,7 +54,7 @@ class Throttler(@volatile var desiredRatePerSec: Double,
   def maybeThrottle(observed: Double): Unit = {
     val msPerSec = TimeUnit.SECONDS.toMillis(1)
     val nsPerSec = TimeUnit.SECONDS.toNanos(1)
-    val currentDesiredRatePerSec = desiredRatePerSec;
+    val currentDesiredRatePerSec = desiredRatePerSec
 
     meter.mark(observed.toLong)
     lock synchronized {
@@ -83,7 +83,7 @@ class Throttler(@volatile var desiredRatePerSec: Double,
   }
 
   def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
-    desiredRatePerSec = updatedDesiredRatePerSec;
+    desiredRatePerSec = updatedDesiredRatePerSec
   }
 }
 
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala 
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 10586317f65..8f3ae49b7aa 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -32,7 +32,7 @@ object ToolsUtils {
     val validHostPort = hostPorts.filter { hostPortData =>
       org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
     }
-    val isValid = !validHostPort.isEmpty && validHostPort.size == 
hostPorts.length
+    val isValid = !validHostPort.isEmpty && validHostPort.length == 
hostPorts.length
     if (!isValid)
       CommandLineUtils.printUsageAndExit(parser, "Please provide valid 
host:port like host1:9091,host2:9092\n ")
   }
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala 
b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 462a7436bb1..54490e3dd65 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -161,7 +161,7 @@ class VerifiableProperties(val props: Properties) extends 
Logging {
     }
   }
 
-  def getBoolean(name: String) = getString(name).toBoolean
+  def getBoolean(name: String): Boolean = getString(name).toBoolean
 
   /**
    * Get a string property, or, if no such property is defined, return the 
given default value


Reply via email to