This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2ed09db6c4 MINOR: Scala cleanups in core (#12058)
2ed09db6c4 is described below
commit 2ed09db6c497185584e7e33044d0ce1945a61681
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Apr 20 15:10:46 2022 +0200
MINOR: Scala cleanups in core (#12058)
Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 2 +-
...TransactionMarkerRequestCompletionHandler.scala | 2 +-
core/src/main/scala/kafka/log/AbstractIndex.scala | 2 +-
core/src/main/scala/kafka/log/LocalLog.scala | 8 ++---
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
core/src/main/scala/kafka/log/LogConfig.scala | 4 +--
core/src/main/scala/kafka/log/LogLoader.scala | 2 +-
core/src/main/scala/kafka/log/OffsetIndex.scala | 2 +-
core/src/main/scala/kafka/log/TimeIndex.scala | 2 +-
.../kafka/metrics/KafkaCSVMetricsReporter.scala | 2 +-
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 2 +-
.../kafka/metrics/LinuxIoMetricsCollector.scala | 4 +--
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 4 +--
.../scala/kafka/raft/KafkaNetworkChannel.scala | 2 +-
.../kafka/server/BrokerLifecycleManager.scala | 42 +++++++++++-----------
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +--
.../scala/kafka/server/ClientQuotaManager.scala | 6 ++--
.../main/scala/kafka/server/ConfigHandler.scala | 4 +--
.../main/scala/kafka/server/ControllerApis.scala | 2 +-
.../main/scala/kafka/server/ControllerServer.scala | 2 +-
.../kafka/server/DelegationTokenManager.scala | 12 +++----
.../src/main/scala/kafka/server/FetchSession.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 14 ++++----
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +--
.../scala/kafka/server/PartitionMetadataFile.scala | 5 ++-
.../main/scala/kafka/server/ReplicaManager.scala | 25 +++++++------
.../kafka/server/ReplicationQuotaManager.scala | 2 +-
.../server/metadata/BrokerMetadataListener.scala | 18 +++++-----
.../server/metadata/BrokerMetadataPublisher.scala | 12 +++----
.../metadata/BrokerMetadataSnapshotter.scala | 2 +-
.../metadata/ClientQuotaMetadataManager.scala | 12 +++----
.../kafka/server/metadata/KRaftMetadataCache.scala | 22 ++++++------
.../kafka/server/metadata/ZkConfigRepository.scala | 2 +-
.../main/scala/kafka/utils/CommandLineUtils.scala | 4 +--
core/src/main/scala/kafka/utils/Exit.scala | 2 +-
core/src/main/scala/kafka/utils/FileLock.scala | 2 +-
.../main/scala/kafka/utils/KafkaScheduler.scala | 2 +-
core/src/main/scala/kafka/utils/ToolsUtils.scala | 2 +-
core/src/main/scala/kafka/utils/VersionInfo.scala | 2 +-
39 files changed, 121 insertions(+), 125 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 22d82dfa6d..6bf337d679 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1104,7 +1104,7 @@ class GroupCoordinator(val brokerId: Int,
groupId != null
case _ =>
// The remaining APIs are groups using Kafka for group coordination
and must have a non-empty groupId
- groupId != null && !groupId.isEmpty
+ groupId != null && groupId.nonEmpty
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 848e0fa65c..7a59139b17 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -89,7 +89,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val writeTxnMarkerResponse =
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
- val responseErrors = writeTxnMarkerResponse.errorsByProducerId;
+ val responseErrors = writeTxnMarkerResponse.errorsByProducerId
for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 31b9f6d8dd..37cd4b9f55 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -188,7 +188,7 @@ abstract class AbstractIndex(@volatile private var _file:
File, val baseOffset:
safeForceUnmap()
raf.setLength(roundedNewSize)
_length = roundedNewSize
- mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
roundedNewSize)
+ mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0,
roundedNewSize)
_maxEntries = mmap.limit() / entrySize
mmap.position(position)
debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position
is ${mmap.position()} " +
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala
b/core/src/main/scala/kafka/log/LocalLog.scala
index 620f33119a..b0e7b0e446 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -86,7 +86,7 @@ class LocalLog(@volatile private var _dir: File,
private[log] def dir: File = _dir
- private[log] def name: String = dir.getName()
+ private[log] def name: String = dir.getName
private[log] def parentDir: String = _parentDir
@@ -657,9 +657,9 @@ object LocalLog extends Logging {
*/
private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
- val suffix =
s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}"
+ val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
- s"${topicPartition.topic().substring(0, prefixLength)}${suffix}"
+ s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
}
/**
@@ -931,7 +931,7 @@ object LocalLog extends Logging {
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(CleanedFileSuffix,
SwapFileSuffix))
- sortedNewSegments.reverse.foreach(existingSegments.add(_))
+ sortedNewSegments.reverse.foreach(existingSegments.add)
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
// delete the old files
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 55d795243d..df20a4a36e 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -486,7 +486,7 @@ private[log] class Cleaner(val id: Int,
/* buffer used for write i/o */
private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
- private val decompressionBufferSupplier = BufferSupplier.create();
+ private val decompressionBufferSupplier = BufferSupplier.create()
require(offsetMap.slots * dupBufferLoadFactor > 1, "offset map is too small
to fit in even a single message, so log cleaning will never make progress. You
can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads")
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala
b/core/src/main/scala/kafka/log/LogConfig.scala
index e0a2cb463e..3f70f88320 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -145,7 +145,7 @@ case class LogConfig(props: java.util.Map[_, _],
overriddenConfigs: Set[String]
}
if (localLogRetentionBytes > retentionSize) {
- throw new ConfigException(LogConfig.LocalLogRetentionBytesProp,
localLogRetentionBytes, s"Value must not be more than property:
${LogConfig.RetentionBytesProp} value.");
+ throw new ConfigException(LogConfig.LocalLogRetentionBytesProp,
localLogRetentionBytes, s"Value must not be more than property:
${LogConfig.RetentionBytesProp} value.")
}
localLogRetentionBytes
@@ -268,7 +268,7 @@ object LogConfig {
private[log] val ServerDefaultHeaderName = "Server Default Property"
- val configsWithNoServerDefaults: Set[String] =
Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp,
LocalLogRetentionBytesProp);
+ val configsWithNoServerDefaults: Set[String] =
Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp,
LocalLogRetentionBytesProp)
// Package private for testing
private[log] class LogConfigDef(base: ConfigDef) extends ConfigDef(base) {
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala
b/core/src/main/scala/kafka/log/LogLoader.scala
index eb9dec7a58..581d016e5e 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -392,7 +392,7 @@ class LogLoader(
Some(logEndOffset)
else {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) " +
- s"is smaller than logStartOffset ${logStartOffsetCheckpoint}. " +
+ s"is smaller than logStartOffset $logStartOffsetCheckpoint. " +
"This could happen if segment files were deleted from the file
system.")
removeAndDeleteSegmentsAsync(segments.values)
leaderEpochCache.foreach(_.clearAndFlush())
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index a4183b1715..62afbac930 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -156,7 +156,7 @@ class OffsetIndex(_file: File, baseOffset: Long,
maxIndexSize: Int = -1, writabl
}
}
- override def truncate() = truncateToEntries(0)
+ override def truncate(): Unit = truncateToEntries(0)
override def truncateTo(offset: Long): Unit = {
inLock(lock) {
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala
b/core/src/main/scala/kafka/log/TimeIndex.scala
index 779a45138b..2c464d602f 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -159,7 +159,7 @@ class TimeIndex(_file: File, baseOffset: Long,
maxIndexSize: Int = -1, writable:
}
}
- override def truncate() = truncateToEntries(0)
+ override def truncate(): Unit = truncateToEntries(0)
/**
* Remove all entries from the index which have an offset greater than or
equal to the given offset.
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index fa3c45a494..607cd188e0 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -51,7 +51,7 @@ private class KafkaCSVMetricsReporter extends
KafkaMetricsReporter
val metricsConfig = new KafkaMetricsConfig(props)
csvDir = new File(props.getString("kafka.csv.metrics.dir",
"kafka_metrics"))
Utils.delete(csvDir)
- Files.createDirectories(csvDir.toPath())
+ Files.createDirectories(csvDir.toPath)
underlying = new CsvReporter(KafkaYammerMetrics.defaultRegistry(),
csvDir)
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default =
false)) {
initialized = true
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index d174f3ea83..161d1f2f36 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -53,7 +53,7 @@ trait KafkaMetricsGroup extends Logging {
nameBuilder.append(typeName)
- if (name.length > 0) {
+ if (name.nonEmpty) {
nameBuilder.append(",name=")
nameBuilder.append(name)
}
diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
index 17de008580..5a41dbad73 100644
--- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -87,10 +87,10 @@ class LinuxIoMetricsCollector(procRoot: String, val time:
Time, val logger: Logg
}
def usable(): Boolean = {
- if (path.toFile().exists()) {
+ if (path.toFile.exists()) {
updateValues(time.milliseconds())
} else {
- logger.debug(s"disabling IO metrics collection because ${path} does not
exist.")
+ logger.debug(s"disabling IO metrics collection because $path does not
exist.")
false
}
}
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 1b0aef3fed..dba8975d43 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -368,7 +368,7 @@ final class KafkaMetadataLog private (
val firstBatch = batchIterator.next()
val records = firstBatch.streamingIterator(new
BufferSupplier.GrowableBufferSupplier())
if (firstBatch.isControlBatch) {
- val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+ val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next())
Some(header.lastContainedLogTimestamp())
} else {
warn("Did not find control record at beginning of snapshot")
@@ -405,7 +405,7 @@ final class KafkaMetadataLog private (
*
* For the given predicate, we are testing if the snapshot identified by the
first argument should be deleted.
*/
- private def cleanSnapshots(predicate: (OffsetAndEpoch) => Boolean): Boolean
= {
+ private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
if (snapshots.size < 2)
return false
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index d99039132d..c44d57102c 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -45,7 +45,7 @@ object KafkaNetworkChannel {
// Since we already have the request, we go through a simplified
builder
new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
override def build(version: Short): FetchRequest = new
FetchRequest(fetchRequest, version)
- override def toString(): String = fetchRequest.toString
+ override def toString: String = fetchRequest.toString
}
case fetchSnapshotRequest: FetchSnapshotRequestData =>
new FetchSnapshotRequest.Builder(fetchSnapshotRequest)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 394c353e45..3ca2704eb3 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -264,7 +264,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
sendBrokerRegistration()
- info(s"Incarnation ${incarnationId} of broker ${nodeId} in cluster
${clusterId} " +
+ info(s"Incarnation $incarnationId of broker $nodeId in cluster
$clusterId " +
"is now STARTING.")
}
}
@@ -285,7 +285,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
setListeners(_advertisedListeners).
setRack(rack.orNull)
if (isDebugEnabled) {
- debug(s"Sending broker registration ${data}")
+ debug(s"Sending broker registration $data")
}
_channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
new BrokerRegistrationResponseHandler())
@@ -294,18 +294,18 @@ class BrokerLifecycleManager(val config: KafkaConfig,
private class BrokerRegistrationResponseHandler extends
ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
if (response.authenticationException() != null) {
- error(s"Unable to register broker ${nodeId} because of an
authentication exception.",
- response.authenticationException());
+ error(s"Unable to register broker $nodeId because of an authentication
exception.",
+ response.authenticationException())
scheduleNextCommunicationAfterFailure()
} else if (response.versionMismatch() != null) {
- error(s"Unable to register broker ${nodeId} because of an API version
problem.",
- response.versionMismatch());
+ error(s"Unable to register broker $nodeId because of an API version
problem.",
+ response.versionMismatch())
scheduleNextCommunicationAfterFailure()
} else if (response.responseBody() == null) {
- warn(s"Unable to register broker ${nodeId}.")
+ warn(s"Unable to register broker $nodeId.")
scheduleNextCommunicationAfterFailure()
} else if
(!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) {
- error(s"Unable to register broker ${nodeId} because the controller
returned an " +
+ error(s"Unable to register broker $nodeId because the controller
returned an " +
"invalid response type.")
scheduleNextCommunicationAfterFailure()
} else {
@@ -316,11 +316,11 @@ class BrokerLifecycleManager(val config: KafkaConfig,
_brokerEpoch = message.data().brokerEpoch()
registered = true
initialRegistrationSucceeded = true
- info(s"Successfully registered broker ${nodeId} with broker epoch
${_brokerEpoch}")
+ info(s"Successfully registered broker $nodeId with broker epoch
${_brokerEpoch}")
scheduleNextCommunicationImmediately() // Immediately send a
heartbeat
} else {
- info(s"Unable to register broker ${nodeId} because the controller
returned " +
- s"error ${errorCode}")
+ info(s"Unable to register broker $nodeId because the controller
returned " +
+ s"error $errorCode")
scheduleNextCommunicationAfterFailure()
}
}
@@ -341,7 +341,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
setWantFence(!readyToUnfence).
setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
if (isTraceEnabled) {
- trace(s"Sending broker heartbeat ${data}")
+ trace(s"Sending broker heartbeat $data")
}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
new BrokerHeartbeatResponseHandler())
@@ -350,18 +350,18 @@ class BrokerLifecycleManager(val config: KafkaConfig,
private class BrokerHeartbeatResponseHandler extends
ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
if (response.authenticationException() != null) {
- error(s"Unable to send broker heartbeat for ${nodeId} because of an " +
- "authentication exception.", response.authenticationException());
+ error(s"Unable to send broker heartbeat for $nodeId because of an " +
+ "authentication exception.", response.authenticationException())
scheduleNextCommunicationAfterFailure()
} else if (response.versionMismatch() != null) {
- error(s"Unable to send broker heartbeat for ${nodeId} because of an
API " +
- "version problem.", response.versionMismatch());
+ error(s"Unable to send broker heartbeat for $nodeId because of an API
" +
+ "version problem.", response.versionMismatch())
scheduleNextCommunicationAfterFailure()
} else if (response.responseBody() == null) {
- warn(s"Unable to send broker heartbeat for ${nodeId}. Retrying.")
+ warn(s"Unable to send broker heartbeat for $nodeId. Retrying.")
scheduleNextCommunicationAfterFailure()
} else if
(!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) {
- error(s"Unable to send broker heartbeat for ${nodeId} because the
controller " +
+ error(s"Unable to send broker heartbeat for $nodeId because the
controller " +
"returned an invalid response type.")
scheduleNextCommunicationAfterFailure()
} else {
@@ -371,7 +371,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
failedAttempts = 0
_state match {
case BrokerState.STARTING =>
- if (message.data().isCaughtUp()) {
+ if (message.data().isCaughtUp) {
info(s"The broker has caught up. Transitioning from STARTING
to RECOVERY.")
_state = BrokerState.RECOVERY
initialCatchUpFuture.complete(null)
@@ -382,7 +382,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
// there is no recovery work to be done, we start up a bit
quicker.
scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
case BrokerState.RECOVERY =>
- if (!message.data().isFenced()) {
+ if (!message.data().isFenced) {
info(s"The broker has been unfenced. Transitioning from
RECOVERY to RUNNING.")
_state = BrokerState.RUNNING
} else {
@@ -417,7 +417,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
scheduleNextCommunicationAfterSuccess()
}
} else {
- warn(s"Broker ${nodeId} sent a heartbeat request but received error
${errorCode}.")
+ warn(s"Broker $nodeId sent a heartbeat request but received error
$errorCode.")
scheduleNextCommunicationAfterFailure()
}
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c537d99b0f..8fdc59d945 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -64,8 +64,8 @@ class BrokerSnapshotWriterBuilder(raftClient:
RaftClient[ApiMessageAndVersion])
raftClient.createSnapshot(committedOffset, committedEpoch,
lastContainedLogTime).
asScala.getOrElse(
throw new RuntimeException("A snapshot already exists with " +
- s"committedOffset=${committedOffset},
committedEpoch=${committedEpoch}, " +
- s"lastContainedLogTime=${lastContainedLogTime}")
+ s"committedOffset=$committedOffset, committedEpoch=$committedEpoch, " +
+ s"lastContainedLogTime=$lastContainedLogTime")
)
}
}
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 7334519986..ee7c70bec9 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -593,7 +593,7 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
if (sanitizedUser != null && clientId != null) {
val userEntity = Some(UserEntity(sanitizedUser))
val clientIdEntity = Some(ClientIdEntity(clientId))
- if (!sanitizedUser.isEmpty && !clientId.isEmpty) {
+ if (sanitizedUser.nonEmpty && clientId.nonEmpty) {
// /config/users/<user>/clients/<client-id>
quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity,
clientIdEntity))
if (quota == null) {
@@ -608,14 +608,14 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
// /config/users/<default>/clients/<default>
quota = overriddenQuotas.get(DefaultUserClientIdQuotaEntity)
}
- } else if (!sanitizedUser.isEmpty) {
+ } else if (sanitizedUser.nonEmpty) {
// /config/users/<user>
quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, None))
if (quota == null) {
// /config/users/<default>
quota = overriddenQuotas.get(DefaultUserQuotaEntity)
}
- } else if (!clientId.isEmpty) {
+ } else if (clientId.nonEmpty) {
// /config/clients/<client-id>
quota = overriddenQuotas.get(KafkaQuotaEntity(None, clientIdEntity))
if (quota == null) {
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 2fe49ad17b..ca6a42b824 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -64,7 +64,7 @@ class TopicConfigHandler(private val logManager: LogManager,
kafkaConfig: KafkaC
}
logManager.updateTopicConfig(topic, props)
- def updateThrottledList(prop: String, quotaManager:
ReplicationQuotaManager) = {
+ def updateThrottledList(prop: String, quotaManager:
ReplicationQuotaManager): Unit = {
if (topicConfig.containsKey(prop) &&
topicConfig.getProperty(prop).nonEmpty) {
val partitions = parseThrottledPartitions(topicConfig,
kafkaConfig.brokerId, prop)
quotaManager.markThrottled(topic, partitions)
@@ -177,7 +177,7 @@ class UserConfigHandler(private val quotaManagers:
QuotaManagers, val credential
val sanitizedUser = entities(0)
val sanitizedClientId = if (entities.length == 3) Some(entities(2)) else
None
updateQuotaConfig(Some(sanitizedUser), sanitizedClientId, config)
- if (!sanitizedClientId.isDefined && sanitizedUser !=
ConfigEntityName.Default)
+ if (sanitizedClientId.isEmpty && sanitizedUser != ConfigEntityName.Default)
credentialProvider.updateCredentials(Sanitizer.desanitize(sanitizedUser),
config)
}
}
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 064974b7db..3bd4ca5fce 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -112,7 +112,7 @@ class ControllerApis(val requestChannel: RequestChannel,
} catch {
case e: FatalExitError => throw e
case e: Throwable => {
- val t = if (e.isInstanceOf[ExecutionException]) e.getCause() else e
+ val t = if (e.isInstanceOf[ExecutionException]) e.getCause else e
error(s"Unexpected error handling request ${request.requestDesc(true)}
" +
s"with context ${request.context}", t)
requestHelper.handleError(request, t)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 049ad5b8c2..498402370b 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -150,7 +150,7 @@ class ControllerServer(
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
config.controllerListeners.head.listenerName))
} else {
- throw new ConfigException("No controller.listener.names defined for
controller");
+ throw new ConfigException("No controller.listener.names defined for
controller")
}
val threadNamePrefixAsString = threadNamePrefix.getOrElse("")
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 536a296383..8b96e72543 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -121,7 +121,7 @@ object DelegationTokenManager {
require(mainJs(VersionKey).to[Int] == CurrentVersion)
val owner =
SecurityUtils.parseKafkaPrincipal(Sanitizer.desanitize(mainJs(OwnerKey).to[String]))
val renewerStr = mainJs(RenewersKey).to[Seq[String]]
- val renewers =
renewerStr.map(Sanitizer.desanitize(_)).map(SecurityUtils.parseKafkaPrincipal(_))
+ val renewers =
renewerStr.map(Sanitizer.desanitize).map(SecurityUtils.parseKafkaPrincipal)
val issueTimestamp = mainJs(IssueTimestampKey).to[Long]
val expiryTimestamp = mainJs(ExpiryTimestampKey).to[Long]
val maxTimestamp = mainJs(MaxTimestampKey).to[Long]
@@ -140,13 +140,13 @@ object DelegationTokenManager {
val allow =
//exclude tokens which are not requested
- if (!owners.isEmpty && !owners.get.exists(owner =>
token.ownerOrRenewer(owner))) {
+ if (owners.isDefined && !owners.get.exists(owner =>
token.ownerOrRenewer(owner))) {
false
//Owners and the renewers can describe their own tokens
} else if (token.ownerOrRenewer(requestedPrincipal)) {
true
// Check permission for non-owned tokens
- } else if ((authorizeToken(token.tokenId))) {
+ } else if (authorizeToken(token.tokenId)) {
true
}
else {
@@ -172,7 +172,7 @@ class DelegationTokenManager(val config: KafkaConfig,
val secretKey = {
val keyBytes = if (config.tokenAuthEnabled)
config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
- if (keyBytes == null || keyBytes.length == 0) null
+ if (keyBytes == null || keyBytes.isEmpty) null
else
createSecretKey(keyBytes)
}
@@ -183,7 +183,7 @@ class DelegationTokenManager(val config: KafkaConfig,
private val lock = new Object()
private var tokenChangeListener: ZkNodeChangeNotificationListener = null
- def startup() = {
+ def startup(): Unit = {
if (config.tokenAuthEnabled) {
zkClient.createDelegationTokenPaths()
loadCache()
@@ -192,7 +192,7 @@ class DelegationTokenManager(val config: KafkaConfig,
}
}
- def shutdown() = {
+ def shutdown(): Unit = {
if (config.tokenAuthEnabled) {
if (tokenChangeListener != null) tokenChangeListener.close()
}
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala
b/core/src/main/scala/kafka/server/FetchSession.scala
index f7d348ddc5..b32cb8bcf6 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -790,7 +790,7 @@ class FetchManager(private val time: Time,
new FullFetchContext(time, cache, reqMetadata, fetchData, reqVersion
>= 13, isFollower)
}
debug(s"Created a new full FetchContext with
${partitionsToLogString(fetchData.keySet)}."+
- s"${removedFetchSessionStr}${suffix}")
+ s"$removedFetchSessionStr$suffix")
context
} else {
cache.synchronized {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 86295816f1..5793a87dc7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1226,7 +1226,7 @@ class KafkaApis(val requestChannel: RequestChannel,
var unauthorizedForCreateTopics = Set[String]()
if (authorizedTopics.nonEmpty) {
- val nonExistingTopics =
authorizedTopics.filterNot(metadataCache.contains(_))
+ val nonExistingTopics =
authorizedTopics.filterNot(metadataCache.contains)
if (metadataRequest.allowAutoTopicCreation &&
config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authHelper.authorize(request.context, CREATE, CLUSTER,
CLUSTER_NAME, logIfDenied = false)) {
val authorizedForCreateTopics =
authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
@@ -1350,7 +1350,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val payloadOpt =
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
payloadOpt match {
case Some(payload) =>
- (topicPartition, new
OffsetFetchResponse.PartitionData(payload.toLong,
+ (topicPartition, new
OffsetFetchResponse.PartitionData(payload,
Optional.empty(), OffsetFetchResponse.NO_METADATA,
Errors.NONE))
case None =>
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
@@ -1615,7 +1615,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val listedGroup = new ListGroupsResponseData.ListedGroup()
.setGroupId(group.groupId)
.setProtocolType(group.protocolType)
- .setGroupState(group.state.toString)
+ .setGroupState(group.state)
listedGroup
}.asJava)
.setThrottleTimeMs(throttleMs)
@@ -2156,16 +2156,14 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DeleteRecordsResponse(new DeleteRecordsResponseData()
.setThrottleTimeMs(requestThrottleMs)
- .setTopics(new
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection(mergedResponseStatus.groupBy(_._1.topic).map
{ case (topic, partitionMap) => {
+ .setTopics(new
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection(mergedResponseStatus.groupBy(_._1.topic).map
{ case (topic, partitionMap) =>
new DeleteRecordsTopicResult()
.setName(topic)
- .setPartitions(new
DeleteRecordsResponseData.DeleteRecordsPartitionResultCollection(partitionMap.map
{ case (topicPartition, partitionResult) => {
+ .setPartitions(new
DeleteRecordsResponseData.DeleteRecordsPartitionResultCollection(partitionMap.map
{ case (topicPartition, partitionResult) =>
new
DeleteRecordsPartitionResult().setPartitionIndex(topicPartition.partition)
.setLowWatermark(partitionResult.lowWatermark)
.setErrorCode(partitionResult.errorCode)
- }
}.toList.asJava.iterator()))
- }
}.toList.asJava.iterator()))))
}
@@ -3356,7 +3354,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setThrottleTimeMs(requestThrottleMs)
.setClusterId(clusterId)
.setControllerId(controllerId)
- .setClusterAuthorizedOperations(clusterAuthorizedOperations);
+ .setClusterAuthorizedOperations(clusterAuthorizedOperations)
brokers.foreach { broker =>
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 28cb1cbc6f..d006165577 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -212,7 +212,7 @@ class KafkaServer(
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkClient)
- info(s"Cluster ID = ${clusterId}")
+ info(s"Cluster ID = $clusterId")
/* load metadata */
val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =
@@ -227,7 +227,7 @@ class KafkaServer(
/* check cluster id */
if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined &&
preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
throw new InconsistentClusterIdException(
- s"The Cluster ID ${clusterId} doesn't match stored clusterId
${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
+ s"The Cluster ID $clusterId doesn't match stored clusterId
${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
s"The broker is trying to join the wrong cluster. Configured
zookeeper.connect may be wrong.")
/* generate brokerId */
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 749b6dd66f..f88a4cc907 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -44,8 +44,7 @@ object PartitionMetadataFile {
}
class PartitionMetadataReadBuffer[T](location: String,
- reader: BufferedReader,
- version: Int) extends Logging {
+ reader: BufferedReader) extends Logging
{
def read(): PartitionMetadata = {
def malformedLineException(line: String) =
new IOException(s"Malformed line in checkpoint file ($location):
'$line'")
@@ -141,7 +140,7 @@ class PartitionMetadataFile(val file: File,
try {
val reader = Files.newBufferedReader(path)
try {
- val partitionBuffer = new
PartitionMetadataReadBuffer(file.getAbsolutePath, reader, CurrentVersion)
+ val partitionBuffer = new
PartitionMetadataReadBuffer(file.getAbsolutePath, reader)
partitionBuffer.read()
} finally {
reader.close()
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index eff4824c47..831212f9eb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -711,7 +711,7 @@ class ReplicaManager(val config: KafkaConfig,
/* If the topic name is exceptionally long, we can't support
altering the log directory.
* See KAFKA-4893 for details.
* TODO: fix this by implementing topic IDs. */
- if (UnifiedLog.logFutureDirName(topicPartition).size > 255)
+ if (UnifiedLog.logFutureDirName(topicPartition).length > 255)
throw new InvalidTopicException("The topic name is too long.")
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is
offline")
@@ -2082,28 +2082,27 @@ class ReplicaManager(val config: KafkaConfig,
topicId: Uuid): Option[(Partition,
Boolean)] = {
getPartition(tp) match {
case HostedPartition.Offline =>
- stateChangeLogger.warn(s"Unable to bring up new local leader ${tp} " +
- s"with topic id ${topicId} because it resides in an offline log " +
+ stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+ s"with topic id $topicId because it resides in an offline log " +
"directory.")
None
- case HostedPartition.Online(partition) => {
+ case HostedPartition.Online(partition) =>
if (partition.topicId.exists(_ != topicId)) {
// Note: Partition#topicId will be None here if the Log object for
this partition
// has not been created.
- throw new IllegalStateException(s"Topic ${tp} exists, but its ID is
" +
- s"${partition.topicId.get}, not ${topicId} as expected")
+ throw new IllegalStateException(s"Topic $tp exists, but its ID is " +
+ s"${partition.topicId.get}, not $topicId as expected")
}
Some(partition, false)
- }
case HostedPartition.None =>
if (delta.image().topicsById().containsKey(topicId)) {
- stateChangeLogger.error(s"Expected partition ${tp} with topic id " +
- s"${topicId} to exist, but it was missing. Creating...")
+ stateChangeLogger.error(s"Expected partition $tp with topic id " +
+ s"$topicId to exist, but it was missing. Creating...")
} else {
- stateChangeLogger.info(s"Creating new partition ${tp} with topic id
" +
- s"${topicId}.")
+ stateChangeLogger.info(s"Creating new partition $tp with topic id " +
+ s"$topicId.")
}
// it's a partition that we don't know about yet, so create it and
mark it online
val partition = Partition(tp, time, this)
@@ -2130,10 +2129,10 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
- stateChangeLogger.error(s"Unable to delete replica
${topicPartition} because " +
+ stateChangeLogger.error(s"Unable to delete replica $topicPartition
because " +
"the local replica for the partition is in an offline log
directory")
} else {
- stateChangeLogger.error(s"Unable to delete replica
${topicPartition} because " +
+ stateChangeLogger.error(s"Unable to delete replica $topicPartition
because " +
s"we got an unexpected ${e.getClass.getName} exception:
${e.getMessage}")
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 3035cb1371..c02936973d 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -79,7 +79,7 @@ class ReplicationQuotaManager(val config:
ReplicationQuotaManagerConfig,
private var quota: Quota = null
private val sensorAccess = new SensorAccess(lock, metrics)
private val rateMetricName = metrics.metricName("byte-rate",
replicationType.toString,
- s"Tracking byte-rate for ${replicationType}")
+ s"Tracking byte-rate for $replicationType")
/**
* Update the quota
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 5b71409714..54a777f67f 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -41,7 +41,7 @@ class BrokerMetadataListener(
val maxBytesBetweenSnapshots: Long,
val snapshotter: Option[MetadataSnapshotter]
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
- private val logContext = new LogContext(s"[BrokerMetadataListener
id=${brokerId}] ")
+ private val logContext = new LogContext(s"[BrokerMetadataListener
id=$brokerId] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
@@ -110,7 +110,7 @@ class BrokerMetadataListener(
val results = try {
val loadResults = loadBatches(_delta, reader, None, None, None)
if (isDebugEnabled) {
- debug(s"Loaded new commits: ${loadResults}")
+ debug(s"Loaded new commits: $loadResults")
}
loadResults
} finally {
@@ -154,7 +154,7 @@ class BrokerMetadataListener(
)
_delta.finishSnapshot()
info(s"Loaded snapshot
${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
- s"${loadResults}")
+ s"$loadResults")
} finally {
reader.close()
}
@@ -163,9 +163,9 @@ class BrokerMetadataListener(
}
case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs:
Long, numBytes: Long) {
- override def toString(): String = {
- s"${numBatches} batch(es) with ${numRecords} record(s) in ${numBytes}
bytes " +
- s"ending at offset ${highestMetadataOffset} in ${elapsedUs}
microseconds"
+ override def toString: String = {
+ s"$numBatches batch(es) with $numRecords record(s) in $numBytes bytes " +
+ s"ending at offset $highestMetadataOffset in $elapsedUs microseconds"
}
}
@@ -194,7 +194,7 @@ class BrokerMetadataListener(
var numRecords = 0
var numBytes = 0L
- while (iterator.hasNext()) {
+ while (iterator.hasNext) {
val batch = iterator.next()
val epoch = lastCommittedEpoch.getOrElse(batch.epoch())
@@ -236,7 +236,7 @@ class BrokerMetadataListener(
override def run(): Unit = {
_publisher = Some(publisher)
- log.info(s"Starting to publish metadata events at offset
${highestMetadataOffset}.")
+ log.info(s"Starting to publish metadata events at offset
$highestMetadataOffset.")
try {
publish(publisher)
future.complete(null)
@@ -271,7 +271,7 @@ class BrokerMetadataListener(
_image = _delta.apply()
_delta = new MetadataDelta(_image)
if (isDebugEnabled) {
- debug(s"Publishing new metadata delta ${delta} at offset
${_image.highestOffsetAndEpoch().offset}.")
+ debug(s"Publishing new metadata delta $delta at offset
${_image.highestOffsetAndEpoch().offset}.")
}
publisher.publish(delta, _image)
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 291a1507d2..e653e6e5b2 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -111,7 +111,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
/**
* The broker ID.
*/
- val brokerId = conf.nodeId
+ val brokerId: Int = conf.nodeId
/**
* True if this is the first time we have published metadata.
@@ -199,7 +199,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
processConfigChanges(ConfigEntityName.Default, props)
} else if (resource.name() == brokerId.toString) {
// Apply changes to this broker's dynamic configuration.
- info(s"Updating broker ${brokerId} with new configuration : " +
+ info(s"Updating broker $brokerId with new configuration : " +
toLoggableProps(resource, props).mkString(","))
dynamicConfigHandlers(ConfigType.Broker).
processConfigChanges(resource.name(), props)
@@ -227,7 +227,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// there could be a window during which incorrect authorization results
are returned.
Option(delta.aclsDelta()).foreach( aclsDelta =>
_authorizer match {
- case Some(authorizer: ClusterMetadataAuthorizer) => if
(aclsDelta.isSnapshotDelta()) {
+ case Some(authorizer: ClusterMetadataAuthorizer) => if
(aclsDelta.isSnapshotDelta) {
// If the delta resulted from a snapshot load, we want to apply
the new changes
// all at once using ClusterMetadataAuthorizer#loadSnapshot. If
this is the
// first snapshot load, it will also complete the futures returned
by
@@ -237,10 +237,10 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Because the changes map is a LinkedHashMap, the deltas will be
returned in
// the order they were performed.
aclsDelta.changes().entrySet().forEach(e =>
- if (e.getValue().isPresent()) {
- authorizer.addAcl(e.getKey(), e.getValue().get())
+ if (e.getValue.isPresent) {
+ authorizer.addAcl(e.getKey, e.getValue.get())
} else {
- authorizer.removeAcl(e.getKey())
+ authorizer.removeAcl(e.getKey)
})
}
case _ => // No ClusterMetadataAuthorizer is configured. There is
nothing to do.
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index fb5bfbbd81..f32c4d3238 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -38,7 +38,7 @@ class BrokerMetadataSnapshotter(
threadNamePrefix: Option[String],
writerBuilder: SnapshotWriterBuilder
) extends Logging with MetadataSnapshotter {
- private val logContext = new LogContext(s"[BrokerMetadataSnapshotter
id=${brokerId}] ")
+ private val logContext = new LogContext(s"[BrokerMetadataSnapshotter
id=$brokerId] ")
logIdent = logContext.logPrefix()
/**
diff --git
a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
index 6ada6b258c..3f4b136fb9 100644
--- a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
+++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
@@ -98,10 +98,10 @@ class ClientQuotaMetadataManager(private[metadata] val
quotaManagers: QuotaManag
}
}
quotaDelta.changes().entrySet().forEach { e =>
- handleUserClientQuotaChange(userClientEntity, e.getKey(),
e.getValue().asScala.map(_.toDouble))
+ handleUserClientQuotaChange(userClientEntity, e.getKey,
e.getValue.asScala.map(_.toDouble))
}
} else {
- warn(s"Ignoring unsupported quota entity ${entity}.")
+ warn(s"Ignoring unsupported quota entity $entity.")
}
}
@@ -119,10 +119,10 @@ class ClientQuotaMetadataManager(private[metadata] val
quotaManagers: QuotaManag
quotaDelta.changes().entrySet().forEach { e =>
// The connection quota only understands the connection rate limit
- val quotaName = e.getKey()
- val quotaValue = e.getValue()
+ val quotaName = e.getKey
+ val quotaValue = e.getValue
if (!quotaName.equals(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)) {
- warn(s"Ignoring unexpected quota key ${quotaName} for entity
$ipEntity")
+ warn(s"Ignoring unexpected quota key $quotaName for entity $ipEntity")
} else {
try {
connectionQuotas.updateIpConnectionRateQuota(inetAddress,
quotaValue.asScala.map(_.toInt))
@@ -140,7 +140,7 @@ class ClientQuotaMetadataManager(private[metadata] val
quotaManagers: QuotaManag
case QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG =>
quotaManagers.request
case QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG =>
quotaManagers.controllerMutation
case _ =>
- warn(s"Ignoring unexpected quota key ${key} for entity $quotaEntity")
+ warn(s"Ignoring unexpected quota key $key for entity $quotaEntity")
return
}
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 1ff7a8076c..880950b3a8 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -97,10 +97,10 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
maybeLeader match {
case None =>
val error = if
(!image.cluster().brokers.containsKey(partition.leader)) {
- debug(s"Error while fetching metadata for
${topicName}-${partitionId}: leader not available")
+ debug(s"Error while fetching metadata for
$topicName-$partitionId: leader not available")
Errors.LEADER_NOT_AVAILABLE
} else {
- debug(s"Error while fetching metadata for
${topicName}-${partitionId}: listener $listenerName " +
+ debug(s"Error while fetching metadata for
$topicName-$partitionId: listener $listenerName " +
s"not found on leader ${partition.leader}")
if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else
Errors.LEADER_NOT_AVAILABLE
}
@@ -113,12 +113,12 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
.setIsrNodes(filteredIsr)
.setOfflineReplicas(offlineReplicas)
case Some(leader) =>
- val error = if (filteredReplicas.size < partition.replicas.size) {
- debug(s"Error while fetching metadata for
${topicName}-${partitionId}: replica information not available for " +
+ val error = if (filteredReplicas.size < partition.replicas.length)
{
+ debug(s"Error while fetching metadata for
$topicName-$partitionId: replica information not available for " +
s"following brokers
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
Errors.REPLICA_NOT_AVAILABLE
- } else if (filteredIsr.size < partition.isr.size) {
- debug(s"Error while fetching metadata for
${topicName}-${partitionId}: in sync replica information not available for " +
+ } else if (filteredIsr.size < partition.isr.length) {
+ debug(s"Error while fetching metadata for
$topicName-$partitionId: in sync replica information not available for " +
s"following brokers
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
Errors.REPLICA_NOT_AVAILABLE
} else {
@@ -266,8 +266,8 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
val image = _currentImage
val result = new mutable.HashMap[Int, Node]()
Option(image.topics().getTopic(tp.topic())).foreach { topic =>
- topic.partitions().values().forEach { case partition =>
- partition.replicas.map { case replicaId =>
+ topic.partitions().values().forEach { partition =>
+ partition.replicas.map { replicaId =>
result.put(replicaId, Option(image.cluster().broker(replicaId))
match {
case None => Node.noNode()
case Some(broker) =>
broker.node(listenerName.value()).asScala.getOrElse(Node.noNode())
@@ -288,7 +288,7 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
*/
private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
- if (aliveBrokers.size == 0) {
+ if (aliveBrokers.isEmpty) {
None
} else {
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
@@ -315,8 +315,8 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
image.topics().topicsByName().values().forEach { topic =>
topic.partitions().entrySet().forEach { entry =>
- val partitionId = entry.getKey()
- val partition = entry.getValue()
+ val partitionId = entry.getKey
+ val partition = entry.getValue
partitionInfos.add(new PartitionInfo(topic.name(),
partitionId,
node(partition.leader),
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 8f8dfcd1a0..16842bcd11 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -39,7 +39,7 @@ class ZkConfigRepository(adminZkClient: AdminZkClient)
extends ConfigRepository
}
// ZK stores cluster configs under "<default>".
val effectiveName = if (configResource.`type`.equals(Type.BROKER) &&
- configResource.name.isEmpty()) {
+ configResource.name.isEmpty) {
ConfigEntityName.Default
} else {
configResource.name
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 80726ce06b..1eaee48416 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -52,7 +52,7 @@ object CommandLineUtils extends Logging {
* @param commandOpts Acceptable options for a command
* @param message Message to display on successful check
*/
- def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message:
String) = {
+ def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message:
String): Unit = {
if (isPrintHelpNeeded(commandOpts))
printUsageAndDie(commandOpts.parser, message)
if (isPrintVersionNeeded(commandOpts))
@@ -117,7 +117,7 @@ object CommandLineUtils extends Logging {
val props = new Properties
for (a <- splits) {
- if (a.length == 1 || (a.length == 2 && a(1).isEmpty())) {
+ if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) {
if (acceptMissingValue) props.put(a(0), "")
else throw new IllegalArgumentException(s"Missing value for key
${a(0)}")
}
diff --git a/core/src/main/scala/kafka/utils/Exit.scala
b/core/src/main/scala/kafka/utils/Exit.scala
index ad17237571..eddd929af5 100644
--- a/core/src/main/scala/kafka/utils/Exit.scala
+++ b/core/src/main/scala/kafka/utils/Exit.scala
@@ -45,7 +45,7 @@ object Exit {
JExit.setHaltProcedure(functionToProcedure(haltProcedure))
def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit
= {
- JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name,
runnable.run))
+ JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name,
runnable.run()))
}
def resetExitProcedure(): Unit =
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala
b/core/src/main/scala/kafka/utils/FileLock.scala
index c635f76dff..2de16386c9 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -73,7 +73,7 @@ class FileLock(val file: File) extends Logging {
/**
* Destroy this lock, closing the associated FileChannel
*/
- def destroy() = {
+ def destroy(): Unit = {
this synchronized {
unlock()
channel.close()
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index bec511b3f7..354652ee6f 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -135,7 +135,7 @@ class KafkaScheduler(val threads: Int,
* Package private for testing.
*/
private[kafka] def taskRunning(task: ScheduledFuture[_]): Boolean = {
- executor.getQueue().contains(task)
+ executor.getQueue.contains(task)
}
def resizeThreadPool(newSize: Int): Unit = {
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 0f3de767fd..056545cb03 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
object ToolsUtils {
- def validatePortOrDie(parser: OptionParser, hostPort: String) = {
+ def validatePortOrDie(parser: OptionParser, hostPort: String): Unit = {
val hostPorts: Array[String] = if(hostPort.contains(','))
hostPort.split(",")
else
diff --git a/core/src/main/scala/kafka/utils/VersionInfo.scala
b/core/src/main/scala/kafka/utils/VersionInfo.scala
index 9d3130e668..203488a64b 100644
--- a/core/src/main/scala/kafka/utils/VersionInfo.scala
+++ b/core/src/main/scala/kafka/utils/VersionInfo.scala
@@ -35,6 +35,6 @@ object VersionInfo {
}
def getVersionString: String = {
- s"${getVersion} (Commit:${getCommit})"
+ s"$getVersion (Commit:$getCommit)"
}
}