This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 718202dbf4b KAFKA-15853: Delete CoreUtils.scala and migrate logic to
Java (#21289)
718202dbf4b is described below
commit 718202dbf4b5dde65365051566a2d209da83f248
Author: Uladzislau Blok <[email protected]>
AuthorDate: Wed Jan 21 15:48:18 2026 +0100
KAFKA-15853: Delete CoreUtils.scala and migrate logic to Java (#21289)
Reviewers: Mickael Maison <[email protected]>
---
.../java/org/apache/kafka/common/utils/Utils.java | 60 ++++++-
core/src/main/scala/kafka/cluster/Partition.scala | 122 ++++++-------
.../transaction/TransactionStateManager.scala | 59 +++----
core/src/main/scala/kafka/log/LogManager.scala | 8 +-
.../scala/kafka/metrics/KafkaMetricsReporter.scala | 4 +-
.../main/scala/kafka/network/SocketServer.scala | 10 +-
.../main/scala/kafka/raft/KafkaRaftManager.scala | 9 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 59 +++----
.../src/main/scala/kafka/server/BrokerServer.scala | 43 +++--
.../main/scala/kafka/server/ControllerServer.scala | 16 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 35 ++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 84 ++++++++-
.../main/scala/kafka/server/KafkaRaftServer.scala | 6 +-
.../src/main/scala/kafka/server/SharedServer.scala | 12 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 10 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 196 ---------------------
core/src/main/scala/kafka/utils/Logging.scala | 3 +-
.../kafka/server/QuorumTestHarness.scala | 16 +-
.../kafka/server/LocalLeaderEndPointTest.scala | 7 +-
.../AbstractCoordinatorConcurrencyTest.scala | 8 +-
.../integration/UncleanLeaderElectionTest.scala | 7 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 6 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 4 +-
.../kafka/server/KafkaMetricsReporterTest.scala | 7 +-
.../unit/kafka/server/LogDirFailureTest.scala | 4 +-
.../server/ReplicaManagerConcurrencyTest.scala | 16 +-
.../unit/kafka/server/ServerShutdownTest.scala | 6 +-
.../scala/unit/kafka/utils/CoreUtilsTest.scala | 76 --------
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
.../org/apache/kafka/server/util/LockUtils.java | 33 ++++
31 files changed, 410 insertions(+), 523 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8162b75b4d4..51cb86f5c88 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -35,6 +35,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
+import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
@@ -93,6 +94,9 @@ import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
public final class Utils {
private Utils() {}
@@ -1035,14 +1039,26 @@ public final class Utils {
void run() throws Throwable;
}
- public static void swallow(final Logger log, final Level level, final
String what, final SwallowAction code) {
- swallow(log, level, what, code, null);
+ public static void swallow(final SwallowAction code) {
+ swallow(log, Level.WARN, "Exception while execution the action", code,
null);
+ }
+
+ public static void swallow(final Logger log, final SwallowAction code) {
+ swallow(log, Level.WARN, "Exception while execution the action", code,
null);
+ }
+
+ public static void swallow(final Logger log, final Level level, final
SwallowAction code) {
+ swallow(log, level, "Exception while execution the action", code,
null);
+ }
+
+ public static void swallow(final Logger log, final Level level, final
String errorMessage, final SwallowAction code) {
+ swallow(log, level, errorMessage, code, null);
}
/**
* Run the supplied code. If an exception is thrown, it is swallowed and
registered to the firstException parameter.
*/
- public static void swallow(final Logger log, final Level level, final
String what, final SwallowAction code,
+ public static void swallow(final Logger log, final Level level, final
String errorMessage, final SwallowAction code,
final AtomicReference<Throwable>
firstException) {
if (code != null) {
try {
@@ -1050,20 +1066,20 @@ public final class Utils {
} catch (Throwable t) {
switch (level) {
case INFO:
- log.info(what, t);
+ log.info(errorMessage, t);
break;
case DEBUG:
- log.debug(what, t);
+ log.debug(errorMessage, t);
break;
case ERROR:
- log.error(what, t);
+ log.error(errorMessage, t);
break;
case TRACE:
- log.trace(what, t);
+ log.trace(errorMessage, t);
break;
case WARN:
default:
- log.warn(what, t);
+ log.warn(errorMessage, t);
}
if (firstException != null)
firstException.compareAndSet(null, t);
@@ -1723,6 +1739,34 @@ public final class Utils {
return all;
}
+ /**
+ * Register the given mbean with the platform mbean server,
+ * unregistering any mbean that was there before. Note,
+ * this method will not throw an exception if the registration
+ * fails (since there is nothing you can do, and it isn't fatal),
+ * instead it just returns false indicating the registration failed.
+ *
+ * @param mbean The object to register as a mbean
+ * @param name The name to register this mbean with
+ * @return true if the registration succeeded
+ */
+ public static boolean registerMBean(Object mbean, String name) {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ synchronized (mbs) {
+ ObjectName objName = new ObjectName(name);
+ if (mbs.isRegistered(objName)) {
+ mbs.unregisterMBean(objName);
+ }
+ mbs.registerMBean(mbean, objName);
+ return true;
+ }
+ } catch (Exception e) {
+ log.error("Failed to register Mbean with name {}", name, e);
+ return false;
+ }
+ }
+
/**
* A runnable that can throw checked exception.
*/
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 76a4b247edd..718a9feba44 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.{CompletableFuture,
ConcurrentHashMap, CopyOnWriteAr
import kafka.log._
import kafka.server._
import kafka.server.share.DelayedShareFetch
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.common.errors._
@@ -35,7 +34,7 @@ import
org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState,
MetadataCache, PartitionRegistration}
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
@@ -48,6 +47,7 @@ import
org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperation
import org.apache.kafka.server.replica.Replica
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
UnexpectedAppendOffsetException}
+import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level
@@ -67,11 +67,11 @@ class DelayedOperations(topicId: Option[Uuid],
def checkAndCompleteAll(): Unit = {
val requestKey = new TopicPartitionOperationKey(topicPartition)
- CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), this,
Level.ERROR)
- CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), this,
Level.ERROR)
- CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), this,
Level.ERROR)
- if (topicId.isDefined) CoreUtils.swallow(() ->
shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(
- topicId.get, topicPartition.partition())), this, Level.ERROR)
+ Utils.swallow(this.logger.underlying, Level.ERROR, () =>
fetch.checkAndComplete(requestKey))
+ Utils.swallow(this.logger.underlying, Level.ERROR, () =>
produce.checkAndComplete(requestKey))
+ Utils.swallow(this.logger.underlying, Level.ERROR, () =>
deleteRecords.checkAndComplete(requestKey))
+ if (topicId.isDefined) Utils.swallow(this.logger.underlying, Level.ERROR,
() => shareFetch.checkAndComplete(
+ new DelayedShareFetchPartitionKey(topicId.get,
topicPartition.partition())))
}
def numDelayedDelete: Int = deleteRecords.numDelayed()
@@ -266,7 +266,7 @@ class Partition(val topicPartition: TopicPartition,
def inSyncReplicaIds: Set[Int] =
partitionState.isr.asScala.map(_.toInt).toSet
def maybeAddListener(listener: PartitionListener): Boolean = {
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock(leaderIsrUpdateLock, () => {
// `log` is set to `None` when the partition is failed or deleted.
log match {
case Some(_) =>
@@ -276,7 +276,7 @@ class Partition(val topicPartition: TopicPartition,
case None =>
false
}
- }
+ })
}
def removeListener(listener: PartitionListener): Unit = {
@@ -295,7 +295,7 @@ class Partition(val topicPartition: TopicPartition,
// The writeLock is needed to make sure that while the caller checks the
log directory of the
// current replica and the existence of the future replica, no other
thread can update the log directory of the
// current replica or remove the future replica.
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock(leaderIsrUpdateLock, () => {
val currentLogDir = localLogOrException.parentDir
if (currentLogDir == logDir) {
info(s"Current log directory $currentLogDir is same as requested log
dir $logDir. " +
@@ -314,7 +314,7 @@ class Partition(val topicPartition: TopicPartition,
true
}
}
- }
+ })
}
def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid],
@@ -462,17 +462,17 @@ class Partition(val topicPartition: TopicPartition,
remoteReplicasMap.values.asScala
def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock(leaderIsrUpdateLock, () => {
futureLog.exists(_.parentDir != newDestinationDir)
- }
+ })
}
def removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit = {
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock[Exception](leaderIsrUpdateLock, () => {
futureLog = None
if (deleteFromLogDir)
logManager.asyncDelete(topicPartition, isFuture = true)
- }
+ })
}
// Returns a VerificationGuard if we need to verify. This starts or
continues the verification process. Otherwise return the
@@ -504,7 +504,7 @@ class Partition(val topicPartition: TopicPartition,
if (futureReplicaLEO.contains(localReplicaLEO)) {
// The write lock is needed to make sure that while
ReplicaAlterDirThread checks the LEO of the
// current replica, no other thread can update LEO of the current
replica via log truncation or log append operation.
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock(leaderIsrUpdateLock, () => {
futureLog match {
case Some(futurePartitionLog) =>
if (log.exists(_.logEndOffset ==
futurePartitionLog.logEndOffset)) {
@@ -518,7 +518,7 @@ class Partition(val topicPartition: TopicPartition,
// state again to avoid race condition
false
}
- }
+ })
} else false
}
}
@@ -531,13 +531,13 @@ class Partition(val topicPartition: TopicPartition,
*/
def delete(): Unit = {
// need to hold the lock to prevent appendMessagesToLeader() from hitting
I/O exceptions due to log being deleted
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock[Exception](leaderIsrUpdateLock, () => {
clear()
listeners.forEach { listener =>
listener.onDeleted(topicPartition)
}
listeners.clear()
- }
+ })
}
/**
@@ -545,14 +545,14 @@ class Partition(val topicPartition: TopicPartition,
* transitions to Offline.
*/
def markOffline(): Unit = {
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock[Exception](leaderIsrUpdateLock, () => {
clear()
listeners.forEach { listener =>
listener.onFailed(topicPartition)
}
listeners.clear()
- }
+ })
}
/**
@@ -589,7 +589,7 @@ class Partition(val topicPartition: TopicPartition,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid],
targetDirectoryId: Option[Uuid] = None): Boolean = {
- val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
+ val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock,
() => {
// Partition state changes are expected to have a partition epoch larger
or equal
// to the current partition epoch. The latter is allowed because the
partition epoch
// is also updated by the AlterPartition response so the new epoch might
be known
@@ -675,7 +675,7 @@ class Partition(val topicPartition: TopicPartition,
// We may need to increment high watermark since ISR could be down to 1.
(maybeIncrementLeaderHW(leaderLog, currentTimeMs = currentTimeMs),
isNewLeader)
- }
+ })
// Some delayed operations may be unblocked after HW changed.
if (leaderHWIncremented)
@@ -695,7 +695,7 @@ class Partition(val topicPartition: TopicPartition,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid],
targetLogDirectoryId: Option[Uuid] = None): Boolean = {
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock(leaderIsrUpdateLock, () => {
if (partitionRegistration.partitionEpoch < partitionEpoch) {
stateChangeLogger.info(s"Skipped the become-follower state change for
$topicPartition with topic id $topicId, " +
s"partition registration $partitionRegistration and isNew=$isNew
since the follower is already at a newer partition epoch $partitionEpoch.")
@@ -738,7 +738,7 @@ class Partition(val topicPartition: TopicPartition,
// We must restart the fetchers when the leader epoch changed regardless
of
// whether the leader changed as well.
isNewLeaderEpoch
- }
+ })
}
private def createLogInAssignedDirectoryId(isNew: Boolean,
highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid],
targetLogDirectoryId: Option[Uuid]): Unit = {
@@ -777,7 +777,7 @@ class Partition(val topicPartition: TopicPartition,
// Apply read lock here to avoid the race between ISR updates and the
fetch requests from rebooted follower. It
// could break the broker epoch checks in the ISR expansion.
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock[Exception](leaderIsrUpdateLock, () => {
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata,
followerStartOffset,
@@ -785,7 +785,7 @@ class Partition(val topicPartition: TopicPartition,
leaderEndOffset,
brokerEpoch
)
- }
+ })
val newLeaderLW = if (delayedOperations.numDelayedDelete > 0)
lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
@@ -800,9 +800,9 @@ class Partition(val topicPartition: TopicPartition,
val leaderHWIncremented = if (prevFollowerEndOffset !=
replica.stateSnapshot.logEndOffset) {
// the leader log may be updated by ReplicaAlterLogDirsThread so the
following method must be in lock of
// leaderIsrUpdateLock to prevent adding new hw to invalid log.
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock(leaderIsrUpdateLock, () => {
leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog,
followerFetchTimeMs))
- }
+ })
} else {
false
}
@@ -873,11 +873,11 @@ class Partition(val topicPartition: TopicPartition,
* This function can be triggered when a replica's LEO has incremented.
*/
private def maybeExpandIsr(followerReplica: Replica): Unit = {
- val needsIsrUpdate = !partitionState.isInflight &&
canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock)
{
+ val needsIsrUpdate = !partitionState.isInflight &&
canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock,
() => {
needsExpandIsr(followerReplica)
- }
+ })
if (needsIsrUpdate) {
- val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
+ val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock, () => {
// check if this replica needs to be added to the ISR
partitionState match {
case currentState: CommittedPartitionState if
needsExpandIsr(followerReplica) =>
@@ -885,7 +885,7 @@ class Partition(val topicPartition: TopicPartition,
case _ =>
None
}
- }
+ })
// Send the AlterPartition request outside of the LeaderAndIsr lock
since the completion logic
// may increment the high watermark (and consequently complete delayed
operations).
alterIsrUpdateOpt.foreach(submitAlterPartition)
@@ -1087,13 +1087,13 @@ class Partition(val topicPartition: TopicPartition,
def maybeShrinkIsr(): Unit = {
def needsIsrUpdate: Boolean = {
- !partitionState.isInflight && inReadLock(leaderIsrUpdateLock) {
+ !partitionState.isInflight && inReadLock(leaderIsrUpdateLock, () => {
needsShrinkIsr()
- }
+ })
}
if (needsIsrUpdate) {
- val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
+ val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock, () => {
leaderLogIfLocal.flatMap { leaderLog =>
val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
partitionState match {
@@ -1118,7 +1118,7 @@ class Partition(val topicPartition: TopicPartition,
None
}
}
- }
+ })
// Send the AlterPartition request outside of the LeaderAndIsr lock
since the completion logic
// may increment the high watermark (and consequently complete delayed
operations).
alterIsrUpdateOpt.foreach(submitAlterPartition)
@@ -1171,11 +1171,11 @@ class Partition(val topicPartition: TopicPartition,
if (isFuture) {
// The read lock is needed to handle race condition if request handler
thread tries to
// remove future replica after receiving AlterReplicaLogDirsRequest.
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock(leaderIsrUpdateLock, () => {
// Note the replica may be undefined if it is removed by a
non-ReplicaAlterLogDirsThread before
// this method is called
futureLog.map { _.appendAsFollower(records, partitionLeaderEpoch) }
- }
+ })
} else {
// The lock is needed to prevent the follower replica from being updated
while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace
follower replica with the future replica.
@@ -1223,7 +1223,7 @@ class Partition(val topicPartition: TopicPartition,
verificationGuard: VerificationGuard = VerificationGuard.SENTINEL,
transactionVersion: Short = TransactionVersion.TV_UNKNOWN
): LogAppendInfo = {
- val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
+ val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock, () => {
leaderLogIfLocal match {
case Some(leaderLog) =>
val minIsr = effectiveMinIsr(leaderLog)
@@ -1245,7 +1245,7 @@ class Partition(val topicPartition: TopicPartition,
throw new NotLeaderOrFollowerException("Leader not local for
partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
- }
+ })
info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else
LeaderHwChange.SAME)
}
@@ -1296,7 +1296,7 @@ class Partition(val topicPartition: TopicPartition,
if (fetchParams.isFromFollower) {
// Check that the request is from a valid replica before doing the read
- val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
+ val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock, () => {
val localLog = localLogWithEpochOrThrow(
fetchPartitionData.currentLeaderEpoch,
fetchParams.fetchOnlyLeader
@@ -1307,7 +1307,7 @@ class Partition(val topicPartition: TopicPartition,
)
val logReadInfo = readFromLocalLog(localLog)
(replica, logReadInfo)
- }
+ })
if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
updateFollowerFetchState(
@@ -1322,13 +1322,13 @@ class Partition(val topicPartition: TopicPartition,
logReadInfo
} else {
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock(leaderIsrUpdateLock, () => {
val localLog = localLogWithEpochOrThrow(
fetchPartitionData.currentLeaderEpoch,
fetchParams.fetchOnlyLeader
)
readFromLocalLog(localLog)
- }
+ })
}
}
@@ -1433,7 +1433,7 @@ class Partition(val topicPartition: TopicPartition,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean,
- remoteLogManager: Option[RemoteLogManager] =
None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock) {
+ remoteLogManager: Option[RemoteLogManager] =
None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock, () => {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch,
fetchOnlyFromLeader)
@@ -1478,7 +1478,7 @@ class Partition(val topicPartition: TopicPartition,
offsetResultHolder.lastFetchableOffset(Optional.of(lastFetchableOffset))
offsetResultHolder
}
- }
+ })
def activeProducerState: DescribeProducersResponseData.PartitionResponse = {
val producerState = new DescribeProducersResponseData.PartitionResponse()
@@ -1498,16 +1498,16 @@ class Partition(val topicPartition: TopicPartition,
}
def fetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
- fetchOnlyFromLeader: Boolean): LogOffsetSnapshot =
inReadLock(leaderIsrUpdateLock) {
+ fetchOnlyFromLeader: Boolean): LogOffsetSnapshot =
inReadLock(leaderIsrUpdateLock, () => {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch,
fetchOnlyFromLeader)
localLog.fetchOffsetSnapshot
- }
+ })
def logStartOffset: Long = {
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock[Long, Exception](leaderIsrUpdateLock, () => {
leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
- }
+ })
}
/**
@@ -1516,7 +1516,7 @@ class Partition(val topicPartition: TopicPartition,
*
* Return low watermark of the partition.
*/
- def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult =
inReadLock(leaderIsrUpdateLock) {
+ def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult =
inReadLock(leaderIsrUpdateLock, () => {
leaderLogIfLocal match {
case Some(leaderLog) =>
if (!leaderLog.config.delete && leaderLog.config.compact)
@@ -1537,7 +1537,7 @@ class Partition(val topicPartition: TopicPartition,
case None =>
throw new NotLeaderOrFollowerException(s"Leader not local for
partition $topicPartition on broker $localBrokerId")
}
- }
+ })
/**
* Truncate the local log of this partition to the specified offset and
checkpoint the recovery point to this offset
@@ -1548,9 +1548,9 @@ class Partition(val topicPartition: TopicPartition,
def truncateTo(offset: Long, isFuture: Boolean): Unit = {
// The read lock is needed to prevent the follower replica from being
truncated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower
replica with the future replica.
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock[Exception](leaderIsrUpdateLock, () => {
logManager.truncateTo(Map(topicPartition -> offset), isFuture = isFuture)
- }
+ })
}
/**
@@ -1565,9 +1565,9 @@ class Partition(val topicPartition: TopicPartition,
logStartOffsetOpt: Optional[JLong] =
Optional.empty): Unit = {
// The read lock is needed to prevent the follower replica from being
truncated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower
replica with the future replica.
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock[Exception](leaderIsrUpdateLock, () => {
logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture =
isFuture, logStartOffsetOpt)
- }
+ })
}
/**
@@ -1586,7 +1586,7 @@ class Partition(val topicPartition: TopicPartition,
def lastOffsetForLeaderEpoch(currentLeaderEpoch: Optional[Integer],
leaderEpoch: Int,
fetchOnlyFromLeader: Boolean): EpochEndOffset =
{
- inReadLock(leaderIsrUpdateLock) {
+ inReadLock(leaderIsrUpdateLock, () => {
val localLogOrError = getLocalLog(currentLeaderEpoch,
fetchOnlyFromLeader)
localLogOrError match {
case Left(localLog) =>
@@ -1604,7 +1604,7 @@ class Partition(val topicPartition: TopicPartition,
.setPartition(partitionId)
.setErrorCode(error.code)
}
- }
+ })
}
private def prepareIsrExpand(
@@ -1695,7 +1695,7 @@ class Partition(val topicPartition: TopicPartition,
var hwIncremented = false
var shouldRetry = false
- inWriteLock(leaderIsrUpdateLock) {
+ inWriteLock[Exception](leaderIsrUpdateLock, () => {
if (partitionState != proposedIsrState) {
// This means partitionState was updated through leader election or
some other mechanism
// before we got the AlterPartition response. We don't know what
happened on the controller
@@ -1707,7 +1707,7 @@ class Partition(val topicPartition: TopicPartition,
} else {
shouldRetry = handleAlterPartitionError(proposedIsrState,
Errors.forException(e))
}
- }
+ })
if (hwIncremented) {
tryCompleteDelayedRequests()
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 800ac310a13..9f647cd313a 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.ReplicaManager
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.Logging
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
@@ -42,12 +41,12 @@ import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
+import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.storage.internals.log.AppendOrigin
import com.google.re2j.{Pattern, PatternSyntaxException}
import org.apache.kafka.common.errors.InvalidRegularExpression
import java.util.Optional
-
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -118,9 +117,9 @@ class TransactionStateManager(brokerId: Int,
// visible for testing only
private[transaction] def addLoadingPartition(partitionId: Int,
coordinatorEpoch: Int): Unit = {
val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
- inWriteLock(stateLock) {
+ inWriteLock(stateLock, () => {
loadingPartitions.add(partitionAndLeaderEpoch)
- }
+ })
}
// this is best-effort expiration of an ongoing transaction which has been
open for more than its
@@ -129,7 +128,7 @@ class TransactionStateManager(brokerId: Int,
// metadata to abort later.
def timedOutTransactions(): Iterable[TransactionalIdAndProducerIdEpoch] = {
val now = time.milliseconds()
- inReadLock(stateLock) {
+ inReadLock(stateLock, () => {
transactionMetadataCache.flatMap { case (_, entry) =>
entry.metadataPerTransactionalId.asScala.filter { case (_,
txnMetadata) =>
if (txnMetadata.pendingTransitionInProgress) {
@@ -147,14 +146,14 @@ class TransactionStateManager(brokerId: Int,
TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId,
txnMetadata.producerEpoch)
}
}
- }
+ })
}
private def removeExpiredTransactionalIds(
transactionPartition: TopicPartition,
txnMetadataCacheEntry: TxnMetadataCacheEntry,
): Unit = {
- inReadLock(stateLock) {
+ inReadLock[Exception](stateLock, () => {
replicaManager.getLogConfig(transactionPartition) match {
case Some(logConfig) =>
val currentTimeMs = time.milliseconds()
@@ -219,7 +218,7 @@ class TransactionStateManager(brokerId: Int,
warn(s"Transaction expiration for partition $transactionPartition
failed because the log " +
"config was not available, which likely means the partition is not
online or is no longer local.")
}
- }
+ })
}
private def shouldExpire(
@@ -245,12 +244,12 @@ class TransactionStateManager(brokerId: Int,
}
private[transaction] def removeExpiredTransactionalIds(): Unit = {
- inReadLock(stateLock) {
+ inReadLock[Exception](stateLock, () => {
transactionMetadataCache.foreachEntry { (partitionId,
partitionCacheEntry) =>
val transactionPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
removeExpiredTransactionalIds(transactionPartition,
partitionCacheEntry)
}
- }
+ })
}
private def writeTombstonesForExpiredTransactionalIds(
@@ -260,7 +259,7 @@ class TransactionStateManager(brokerId: Int,
): Unit = {
def removeFromCacheCallback(responses: collection.Map[TopicIdPartition,
PartitionResponse]): Unit = {
responses.foreachEntry { (topicPartition, response) =>
- inReadLock(stateLock) {
+ inReadLock[Exception](stateLock, () => {
transactionMetadataCache.get(topicPartition.partition).foreach {
txnMetadataCacheEntry =>
expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
val transactionalId =
idCoordinatorEpochAndMetadata.transactionalId
@@ -283,11 +282,11 @@ class TransactionStateManager(brokerId: Int,
})
}
}
- }
+ })
}
}
- inReadLock(stateLock) {
+ inReadLock[Exception](stateLock, () => {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs,
requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
@@ -296,7 +295,7 @@ class TransactionStateManager(brokerId: Int,
entriesPerPartition =
Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
responseCallback = removeFromCacheCallback,
requestLocal = RequestLocal.noCaching)
- }
+ })
}
def enableTransactionalIdExpiration(): Unit = {
@@ -323,7 +322,7 @@ class TransactionStateManager(brokerId: Int,
filterDurationMs: Long,
filterTransactionalIdPattern: String
): ListTransactionsResponseData = {
- inReadLock(stateLock) {
+ inReadLock(stateLock, () => {
val response = new ListTransactionsResponseData()
if (loadingPartitions.nonEmpty) {
response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code)
@@ -382,7 +381,7 @@ class TransactionStateManager(brokerId: Int,
response.setErrorCode(Errors.NONE.code)
.setTransactionStates(states)
}
- }
+ })
}
/**
@@ -394,7 +393,7 @@ class TransactionStateManager(brokerId: Int,
*/
private def getAndMaybeAddTransactionState(transactionalId: String,
createdTxnMetadataOpt:
Option[TransactionMetadata]): Either[Errors,
Option[CoordinatorEpochAndTxnMetadata]] = {
- inReadLock(stateLock) {
+ inReadLock(stateLock, () => {
val partitionId = partitionFor(transactionalId)
if (loadingPartitions.exists(_.txnPartitionId == partitionId))
Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
@@ -413,7 +412,7 @@ class TransactionStateManager(brokerId: Int,
Left(Errors.NOT_COORDINATOR)
}
}
- }
+ })
}
/**
@@ -465,9 +464,9 @@ class TransactionStateManager(brokerId: Int,
var readAtLeastOneRecord = true
try {
- while (currOffset < logEndOffset && readAtLeastOneRecord &&
!shuttingDown.get() && inReadLock(stateLock) {
+ while (currOffset < logEndOffset && readAtLeastOneRecord &&
!shuttingDown.get() && inReadLock(stateLock, () => {
loadingPartitions.exists { idAndEpoch:
TransactionPartitionAndLeaderEpoch =>
- idAndEpoch.txnPartitionId == topicPartition.partition &&
idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
+ idAndEpoch.txnPartitionId == topicPartition.partition &&
idAndEpoch.coordinatorEpoch == coordinatorEpoch}})) {
val fetchDataInfo = log.read(currOffset,
config.transactionLogLoadBufferSize, FetchIsolation.LOG_END, true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
@@ -547,9 +546,9 @@ class TransactionStateManager(brokerId: Int,
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
- inWriteLock(stateLock) {
+ inWriteLock(stateLock, () => {
loadingPartitions.add(partitionAndLeaderEpoch)
- }
+ })
def loadTransactions(startTimeMs: java.lang.Long): Unit = {
val schedulerTimeMs = time.milliseconds() - startTimeMs
@@ -563,7 +562,7 @@ class TransactionStateManager(brokerId: Int,
info(s"Finished loading ${loadedTransactions.size} transaction metadata
from $topicPartition in " +
s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs
milliseconds was spent in the scheduler.")
- inWriteLock(stateLock) {
+ inWriteLock[Exception](stateLock, () => {
if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
addLoadedTransactionsToCache(topicPartition.partition,
coordinatorEpoch, loadedTransactions)
@@ -595,7 +594,7 @@ class TransactionStateManager(brokerId: Int,
txnTransitMetadata.txnMetadata,
txnTransitMetadata.transitMetadata)
}
}
- }
+ })
info(s"Completed loading transaction metadata from $topicPartition for
coordinator epoch $coordinatorEpoch")
}
@@ -606,13 +605,13 @@ class TransactionStateManager(brokerId: Int,
def removeTransactionsForTxnTopicPartition(partitionId: Int): Unit = {
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
- inWriteLock(stateLock) {
+ inWriteLock[Exception](stateLock, () => {
loadingPartitions --= loadingPartitions.filter(_.txnPartitionId ==
partitionId)
transactionMetadataCache.remove(partitionId).foreach {
txnMetadataCacheEntry =>
info(s"Unloaded transaction metadata $txnMetadataCacheEntry for
$topicPartition following " +
s"local partition deletion")
}
- }
+ })
}
/**
@@ -622,7 +621,7 @@ class TransactionStateManager(brokerId: Int,
def removeTransactionsForTxnTopicPartition(partitionId: Int,
coordinatorEpoch: Int): Unit = {
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
- inWriteLock(stateLock) {
+ inWriteLock[Exception](stateLock, () => {
removeLoadingPartitionWithEpoch(partitionId, coordinatorEpoch)
transactionMetadataCache.remove(partitionId) match {
case Some(txnMetadataCacheEntry) =>
@@ -631,7 +630,7 @@ class TransactionStateManager(brokerId: Int,
case None =>
info(s"No cached transaction metadata found for $topicPartition
during become-follower transition")
}
- }
+ })
}
/**
@@ -777,7 +776,7 @@ class TransactionStateManager(brokerId: Int,
responseCallback(responseError)
}
- inReadLock(stateLock) {
+ inReadLock[Exception](stateLock, () => {
// we need to hold the read lock on the transaction metadata cache until
appending to local log returns;
// this is to avoid the case where an emigration followed by an
immigration could have completed after the check
// returns and before appendRecords() is called, since otherwise entries
with a high coordinator epoch could have
@@ -818,7 +817,7 @@ class TransactionStateManager(brokerId: Int,
trace(s"Appending new metadata $newMetadata for transaction id
$transactionalId with coordinator epoch $coordinatorEpoch to the local
transaction log")
}
}
- }
+ })
}
def startup(retrieveTransactionTopicPartitionCount: () => Int,
enableTransactionalIdExpiration: Boolean): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index bfee35061f8..5ea048cf4d7 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.threadsafe
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition,
Uuid}
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException,
KafkaStorageException, LogDirNotFoundException}
@@ -253,7 +253,7 @@ class LogManager(logDirs: Seq[File],
warn(s"Logs for partitions
${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
s"logs for future partitions
${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log
directory $dir")
- dirLocks.filter(_.file.getParent == dir).foreach(dir =>
CoreUtils.swallow(dir.destroy(), this))
+ dirLocks.filter(_.file.getParent == dir).foreach(dir =>
Utils.swallow(this.logger.underlying, () => dir.destroy()))
}
}
@@ -656,7 +656,7 @@ class LogManager(logDirs: Seq[File],
// stop the cleaner first
if (cleaner != null) {
- CoreUtils.swallow(cleaner.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => cleaner.shutdown())
}
val localLogsByDir = logsByDir
@@ -704,7 +704,7 @@ class LogManager(logDirs: Seq[File],
loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
val cleanShutdownFileHandler = new
CleanShutdownFileHandler(dir.getPath)
debug(s"Writing clean shutdown marker at $dir with broker
epoch=$brokerEpoch")
- CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch),
this)
+ Utils.swallow(this.logger.underlying, () =>
cleanShutdownFileHandler.write(brokerEpoch))
}
}
}
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index eb6bae3ced6..363fab42036 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -20,7 +20,7 @@
package kafka.metrics
-import kafka.utils.{CoreUtils, VerifiableProperties}
+import kafka.utils.VerifiableProperties
import org.apache.kafka.common.utils.Utils
import java.util.concurrent.atomic.AtomicBoolean
@@ -67,7 +67,7 @@ object KafkaMetricsReporter {
reporter.init(verifiableProps)
reporters += reporter
reporter match {
- case bean: KafkaMetricsReporterMBean =>
CoreUtils.registerMBean(reporter, bean.getMBeanName)
+ case bean: KafkaMetricsReporterMBean =>
Utils.registerMBean(reporter, bean.getMBeanName)
case _ =>
}
})
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 306b633f6fa..b9de13d637f 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -929,7 +929,7 @@ private[kafka] class Processor(
}
} finally {
debug(s"Closing selector - processor $id")
- CoreUtils.swallow(closeAll(), this, Level.ERROR)
+ Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
}
}
@@ -1112,7 +1112,8 @@ private[kafka] class Processor(
// the channel has been closed by the selector but the quotas still
need to be updated
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
// Call listeners to notify for closed connection.
- connectionDisconnectListeners.foreach(listener => CoreUtils.swallow(()
-> listener.onDisconnect(connectionId), this, Level.ERROR))
+ connectionDisconnectListeners.foreach(listener =>
Utils.swallow(this.logger.underlying, Level.ERROR, () => {
+ () -> listener.onDisconnect(connectionId)}))
} catch {
case e: Throwable => processException(s"Exception while processing
disconnection of $connectionId", e)
}
@@ -1142,7 +1143,8 @@ private[kafka] class Processor(
connectionQuotas.dec(listenerName, address)
selector.close(connectionId)
// Call listeners to notify for closed connection.
- connectionDisconnectListeners.foreach(listener => CoreUtils.swallow(()
-> listener.onDisconnect(connectionId), this, Level.ERROR))
+ connectionDisconnectListeners.foreach(listener =>
Utils.swallow(this.logger.underlying, Level.ERROR, () => {
+ () -> listener.onDisconnect(connectionId)}))
inflightResponses.remove(connectionId).foreach(response =>
updateRequestMetrics(response))
}
@@ -1273,7 +1275,7 @@ private[kafka] class Processor(
beginShutdown()
thread.join()
if (!started.get) {
- CoreUtils.swallow(closeAll(), this, Level.ERROR)
+ Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
}
} finally {
metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" ->
id.toString).asJava)
diff --git a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
index 8bd616079c6..1d1d72767db 100644
--- a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
+++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
@@ -23,7 +23,6 @@ import java.nio.file.Paths
import java.util.{OptionalInt, Collection => JCollection, Map => JMap}
import java.util.concurrent.CompletableFuture
import kafka.server.KafkaConfig
-import kafka.utils.CoreUtils
import kafka.utils.Logging
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater,
MetadataRecoveryStrategy, NetworkClient}
import org.apache.kafka.common.KafkaException
@@ -143,13 +142,13 @@ class KafkaRaftManager[T](
}
def shutdown(): Unit = {
- CoreUtils.swallow(expirationService.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => expirationService.shutdown())
Utils.closeQuietly(expirationTimer, "expiration timer")
- CoreUtils.swallow(clientDriver.shutdown(), this)
- CoreUtils.swallow(scheduler.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => clientDriver.shutdown())
+ Utils.swallow(this.logger.underlying, () => scheduler.shutdown())
Utils.closeQuietly(netChannel, "net channel")
Utils.closeQuietly(raftLog, "raft log")
- CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
+ Utils.swallow(this.logger.underlying, () =>
dataDirLock.foreach(_.destroy()))
}
override def handleRequest(
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 62bc9363872..115a505643d 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
package kafka.server
import com.yammer.metrics.core.Meter
-import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.PartitionStates
@@ -29,16 +28,12 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests._
-
import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException,
TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
-import org.apache.kafka.server.LeaderEndPoint
-import org.apache.kafka.server.ResultWithPartitions
-import org.apache.kafka.server.ReplicaState
-import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState,
ReplicaState, ResultWithPartitions}
import
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.util.ShutdownableThread
+import org.apache.kafka.server.util.{LockUtils, ShutdownableThread}
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -105,9 +100,7 @@ abstract class AbstractFetcherThread(name: String,
override def shutdown(): Unit = {
initiateShutdown()
- inLock(partitionMapLock) {
- partitionMapCond.signalAll()
- }
+ LockUtils.inLock[Exception](partitionMapLock, () =>
partitionMapCond.signalAll())
awaitShutdown()
// we don't need the lock since the thread has finished shutdown and
metric removal is safe
@@ -121,7 +114,7 @@ abstract class AbstractFetcherThread(name: String,
}
private def maybeFetch(): Unit = {
- val fetchRequestOpt = inLock(partitionMapLock) {
+ val fetchRequestOpt = LockUtils.inLock(partitionMapLock, () => {
val result = leader.buildFetch(partitionStates.partitionStateMap)
val fetchRequestOpt = result.result
val partitionsWithError = result.partitionsWithError
@@ -134,7 +127,7 @@ abstract class AbstractFetcherThread(name: String,
}
fetchRequestOpt
- }
+ })
fetchRequestOpt.ifPresent(replicaFetch =>
processFetchRequest(replicaFetch.partitionData,
replicaFetch.fetchRequest)
@@ -153,7 +146,8 @@ abstract class AbstractFetcherThread(name: String,
* Builds offset for leader epoch requests for partitions that are in the
truncating phase based
* on latest epochs of the future replicas (the one that is fetching)
*/
- private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData],
Set[TopicPartition]) = inLock(partitionMapLock) {
+ private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData],
Set[TopicPartition]) =
+ LockUtils.inLock(partitionMapLock, () => {
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
@@ -172,7 +166,7 @@ abstract class AbstractFetcherThread(name: String,
}
(partitionsWithEpochs, partitionsWithoutEpochs)
- }
+ })
private def maybeTruncate(): Unit = {
val (partitionsWithEpochs, partitionsWithoutEpochs) =
fetchTruncatingPartitions()
@@ -215,7 +209,7 @@ abstract class AbstractFetcherThread(name: String,
val endOffsets =
leader.fetchEpochEndOffsets(latestEpochsForPartitions.asJava)
// Ensure we hold a lock during truncation
- inLock(partitionMapLock) {
+ LockUtils.inLock[Exception](partitionMapLock, () => {
//Check no leadership and no leader epoch changes happened whilst we
were unlocked, fetching epochs
val epochEndOffsets = endOffsets.asScala.filter { case (tp, _) =>
@@ -231,20 +225,21 @@ abstract class AbstractFetcherThread(name: String,
val result = maybeTruncateToEpochEndOffsets(epochEndOffsets,
latestEpochsForPartitions)
handlePartitionsWithErrors(result.partitionsWithError.asScala,
"truncateToEpochEndOffsets")
updateFetchOffsetAndMaybeMarkTruncationComplete(result.result)
- }
+ })
}
// Visibility for unit tests
protected[server] def truncateOnFetchResponse(epochEndOffsets:
Map[TopicPartition, EpochEndOffset]): Unit = {
- inLock(partitionMapLock) {
+ LockUtils.inLock[Exception](partitionMapLock, () => {
val result = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
handlePartitionsWithErrors(result.partitionsWithError.asScala,
"truncateOnFetchResponse")
updateFetchOffsetAndMaybeMarkTruncationComplete(result.result)
- }
+ })
}
// Visible for testing
- private[server] def truncateToHighWatermark(partitions:
Set[TopicPartition]): Unit = inLock(partitionMapLock) {
+ private[server] def truncateToHighWatermark(partitions:
Set[TopicPartition]): Unit =
+ LockUtils.inLock[Exception](partitionMapLock, () => {
val fetchOffsets = mutable.HashMap.empty[TopicPartition,
OffsetTruncationState]
for (tp <- partitions) {
@@ -260,7 +255,7 @@ abstract class AbstractFetcherThread(name: String,
}
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
- }
+ })
private def maybeTruncateToEpochEndOffsets(fetchedEpochs:
Map[TopicPartition, EpochEndOffset],
latestEpochsForPartitions:
Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition,
OffsetTruncationState]] = {
@@ -302,7 +297,8 @@ abstract class AbstractFetcherThread(name: String,
*
* @return true if the epoch in this thread is updated. otherwise, false
*/
- private def onPartitionFenced(tp: TopicPartition, requestEpoch:
Optional[Integer]): Boolean = inLock(partitionMapLock) {
+ private def onPartitionFenced(tp: TopicPartition, requestEpoch:
Optional[Integer]): Boolean =
+ LockUtils.inLock(partitionMapLock, () => {
Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
if (requestEpoch.isPresent && requestEpoch.get == currentLeaderEpoch) {
@@ -315,7 +311,7 @@ abstract class AbstractFetcherThread(name: String,
true
}
}
- }
+ })
// visible for testing
private[server] def processFetchRequest(sessionPartitions:
util.Map[TopicPartition, FetchRequest.PartitionData],
@@ -331,16 +327,16 @@ abstract class AbstractFetcherThread(name: String,
case t: Throwable =>
if (isRunning) {
warn(s"Error in response for fetch request $fetchRequest", t)
- inLock(partitionMapLock) {
+ LockUtils.inLock(partitionMapLock, () => {
partitionsWithError ++= partitionStates.partitionSet.asScala
- }
+ })
}
}
fetcherStats.requestRate.mark()
if (responseData.nonEmpty) {
// process fetched data
- inLock(partitionMapLock) {
+ LockUtils.inLock[Exception](partitionMapLock, () => {
responseData.foreachEntry { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach {
currentFetchState =>
// It's possible that a partition is removed and re-added or
truncated when there is a pending fetch request.
@@ -467,7 +463,7 @@ abstract class AbstractFetcherThread(name: String,
}
}
}
- }
+ })
}
if (divergingEndOffsets.nonEmpty)
@@ -604,8 +600,8 @@ abstract class AbstractFetcherThread(name: String,
* @param tp Topic partition
* @param leaderEpochOffset Epoch end offset received from the leader for
this topic partition
*/
- private def getOffsetTruncationState(tp: TopicPartition,
- leaderEpochOffset: EpochEndOffset):
OffsetTruncationState = inLock(partitionMapLock) {
+ private def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset:
EpochEndOffset): OffsetTruncationState =
+ LockUtils.inLock(partitionMapLock , () => {
if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
// truncate to initial offset which is the high watermark for follower
replica. For
// future replica, it is either high watermark of the future replica or
current
@@ -655,7 +651,7 @@ abstract class AbstractFetcherThread(name: String,
OffsetTruncationState(min(leaderEpochOffset.endOffset,
replicaEndOffset), truncationCompleted = true)
}
}
- }
+ })
/**
* Handle a partition whose offset is out of range and return a new fetch
offset.
@@ -868,9 +864,10 @@ abstract class AbstractFetcherThread(name: String,
}
// Visible for testing
- private[server] def fetchState(topicPartition: TopicPartition):
Option[PartitionFetchState] = inLock(partitionMapLock) {
+ private[server] def fetchState(topicPartition: TopicPartition):
Option[PartitionFetchState] =
+ LockUtils.inLock(partitionMapLock, () => {
Option(partitionStates.stateValue(topicPartition))
- }
+ })
protected def toMemoryRecords(records: Records): MemoryRecords = {
(records: @unchecked) match {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index cd061c2c8ab..d9cb3aca495 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -24,7 +24,6 @@ import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.metadata._
import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl,
SharePartitionManager}
-import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -793,14 +792,14 @@ class BrokerServer(
// Stop socket server to stop accepting any more connections and
requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null) {
- CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+ Utils.swallow(this.logger.underlying, () =>
socketServer.stopProcessingRequests())
}
metadataPublishers.forEach(p =>
sharedServer.loader.removeAndClosePublisher(p).get())
metadataPublishers.clear()
if (dataPlaneRequestHandlerPool != null)
- CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
dataPlaneRequestHandlerPool.shutdown())
if (dataPlaneRequestProcessor != null)
- CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+ Utils.swallow(this.logger.underlying, () =>
dataPlaneRequestProcessor.close())
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
/**
@@ -814,44 +813,44 @@ class BrokerServer(
* broker would have to take hours to recover the log during restart.
*/
if (kafkaScheduler != null)
- CoreUtils.swallow(kafkaScheduler.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => kafkaScheduler.shutdown())
if (transactionCoordinator != null)
- CoreUtils.swallow(transactionCoordinator.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
transactionCoordinator.shutdown())
if (groupConfigManager != null)
- CoreUtils.swallow(groupConfigManager.close(), this)
+ Utils.swallow(this.logger.underlying, () => groupConfigManager.close())
if (groupCoordinator != null)
- CoreUtils.swallow(groupCoordinator.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
groupCoordinator.shutdown())
if (partitionMetadataClient != null)
- CoreUtils.swallow(partitionMetadataClient.close(), this)
+ Utils.swallow(this.logger.underlying, () =>
partitionMetadataClient.close())
if (shareCoordinator != null)
- CoreUtils.swallow(shareCoordinator.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
shareCoordinator.shutdown())
if (autoTopicCreationManager != null)
- CoreUtils.swallow(autoTopicCreationManager.close(), this)
+ Utils.swallow(this.logger.underlying, () =>
autoTopicCreationManager.close())
if (assignmentsManager != null)
- CoreUtils.swallow(assignmentsManager.close(), this)
+ Utils.swallow(this.logger.underlying, () => assignmentsManager.close())
if (replicaManager != null)
- CoreUtils.swallow(replicaManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => replicaManager.shutdown())
if (alterPartitionManager != null)
- CoreUtils.swallow(alterPartitionManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
alterPartitionManager.shutdown())
if (forwardingManager != null)
- CoreUtils.swallow(forwardingManager.close(), this)
+ Utils.swallow(this.logger.underlying, () => forwardingManager.close())
if (clientToControllerChannelManager != null)
- CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
clientToControllerChannelManager.shutdown())
if (logManager != null) {
val brokerEpoch = if (lifecycleManager != null)
lifecycleManager.brokerEpoch else -1
- CoreUtils.swallow(logManager.shutdown(brokerEpoch), this)
+ Utils.swallow(this.logger.underlying, () =>
logManager.shutdown(brokerEpoch))
}
// Close remote log manager to give a chance to any of its underlying
clients
@@ -859,21 +858,21 @@ class BrokerServer(
remoteLogManagerOpt.foreach(Utils.closeQuietly(_, "remote log manager"))
if (quotaManagers != null)
- CoreUtils.swallow(quotaManagers.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => quotaManagers.shutdown())
if (socketServer != null)
- CoreUtils.swallow(socketServer.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => socketServer.shutdown())
Utils.closeQuietly(brokerTopicStats, "broker topic stats")
Utils.closeQuietly(sharePartitionManager, "share partition manager")
if (persister != null)
- CoreUtils.swallow(persister.stop(), this)
+ Utils.swallow(this.logger.underlying, () => persister.stop())
if (lifecycleManager != null)
- CoreUtils.swallow(lifecycleManager.close(), this)
+ Utils.swallow(this.logger.underlying, () => lifecycleManager.close())
- CoreUtils.swallow(config.dynamicConfig.clear(), this)
+ Utils.swallow(this.logger.underlying, () => config.dynamicConfig.clear())
Utils.closeQuietly(clientMetricsManager, "client metrics manager")
sharedServer.stopForBroker()
info("shut down completed")
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index b19cf062f56..7b9291ce165 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -23,7 +23,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicConfigPublisher, KRaftMetadataCachePublisher}
import scala.collection.immutable
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -449,7 +449,7 @@ class ControllerServer(
Utils.closeQuietly(registrationManager, "registration manager")
registrationManager = null
if (registrationChannelManager != null) {
- CoreUtils.swallow(registrationChannelManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
registrationChannelManager.shutdown())
registrationChannelManager = null
}
metadataPublishers.forEach(p =>
sharedServer.loader.removeAndClosePublisher(p).get())
@@ -464,24 +464,24 @@ class ControllerServer(
Utils.closeQuietly(registrationsPublisher, "registrations publisher")
registrationsPublisher = null
if (socketServer != null)
- CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+ Utils.swallow(this.logger.underlying, () =>
socketServer.stopProcessingRequests())
if (controller != null)
controller.beginShutdown()
if (socketServer != null)
- CoreUtils.swallow(socketServer.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => socketServer.shutdown())
if (controllerApisHandlerPool != null)
- CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
controllerApisHandlerPool.shutdown())
if (controllerApis != null)
- CoreUtils.swallow(controllerApis.close(), this)
+ Utils.swallow(this.logger.underlying, () => controllerApis.close())
if (quotaManagers != null)
- CoreUtils.swallow(quotaManagers.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => quotaManagers.shutdown())
Utils.closeQuietly(controller, "controller")
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create
topic policy"))
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter
config policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new
RuntimeException("shutting down"))
- CoreUtils.swallow(config.dynamicConfig.clear(), this)
+ Utils.swallow(this.logger.underlying, () => config.dynamicConfig.clear())
sharedServer.stopForController()
} catch {
case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f0b077b17f1..27156831b25 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,7 +25,7 @@ import kafka.log.LogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -48,6 +48,7 @@ import
org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, Replic
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry,
ClientTelemetryExporterProvider}
+import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}
@@ -368,23 +369,23 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
// Visibility for testing
- private[server] def currentKafkaConfig: KafkaConfig =
CoreUtils.inReadLock(lock) {
+ private[server] def currentKafkaConfig: KafkaConfig = inReadLock(lock, () =>
{
currentConfig
- }
+ })
- private[server] def currentDynamicBrokerConfigs: Map[String, String] =
CoreUtils.inReadLock(lock) {
+ private[server] def currentDynamicBrokerConfigs: Map[String, String] =
inReadLock(lock, () => {
dynamicBrokerConfigs.clone()
- }
+ })
- private[server] def currentDynamicDefaultConfigs: Map[String, String] =
CoreUtils.inReadLock(lock) {
+ private[server] def currentDynamicDefaultConfigs: Map[String, String] =
inReadLock(lock, () => {
dynamicDefaultConfigs.clone()
- }
+ })
- private[server] def clientTelemetryExporterPlugin:
Option[ClientTelemetryExporterPlugin] = CoreUtils.inReadLock(lock) {
+ private[server] def clientTelemetryExporterPlugin:
Option[ClientTelemetryExporterPlugin] = inReadLock(lock, () => {
telemetryExporterPluginOpt
- }
+ })
- private[server] def updateBrokerConfig(brokerId: Int, persistentProps:
Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
+ private[server] def updateBrokerConfig(brokerId: Int, persistentProps:
Properties, doLog: Boolean = true): Unit = inWriteLock[Exception](lock, () => {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
dynamicBrokerConfigs.clear()
@@ -393,9 +394,9 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
} catch {
case e: Exception => error(s"Per-broker configs of $brokerId could not
be applied: ${persistentProps.keySet()}", e)
}
- }
+ })
- private[server] def updateDefaultConfig(persistentProps: Properties, doLog:
Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
+ private[server] def updateDefaultConfig(persistentProps: Properties, doLog:
Boolean = true): Unit = inWriteLock[Exception](lock, () => {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = false)
dynamicDefaultConfigs.clear()
@@ -404,7 +405,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
} catch {
case e: Exception => error(s"Cluster default configs could not be
applied: ${persistentProps.keySet()}", e)
}
- }
+ })
/**
* Config updates are triggered through actual changes in stored values.
@@ -414,7 +415,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
* the SSL configs have changed, then the update will be handled when
configuration changes are processed.
* At the moment, only listener configs are considered for reloading.
*/
- private[server] def reloadUpdatedFilesWithoutConfigChange(newProps:
Properties): Unit = CoreUtils.inWriteLock(lock) {
+ private[server] def reloadUpdatedFilesWithoutConfigChange(newProps:
Properties): Unit = inWriteLock[Exception](lock, () => {
reconfigurables.forEach(r => {
if (ReloadableFileConfigs.exists(r.reconfigurableConfigs.contains)) {
r match {
@@ -427,7 +428,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
}
})
- }
+ })
private[server] def fromPersistentProps(persistentProps: Properties,
perBrokerConfig: Boolean):
Properties = {
@@ -471,10 +472,10 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
newProps
}
- private[server] def validate(props: Properties, perBrokerConfig: Boolean):
Unit = CoreUtils.inReadLock(lock) {
+ private[server] def validate(props: Properties, perBrokerConfig: Boolean):
Unit = inReadLock(lock, () => {
val newProps = validatedKafkaProps(props, perBrokerConfig)
processReconfiguration(newProps, validateOnly = true)
- }
+ })
private def removeInvalidConfigs(props: Properties, perBrokerConfig:
Boolean): Unit = {
try {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2092d36d7f9..65097f53a5c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,8 +20,9 @@ package kafka.server
import java.util
import java.util.concurrent.TimeUnit
import java.util.Properties
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
import kafka.utils.Implicits._
+import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
@@ -143,6 +144,81 @@ object KafkaConfig {
}
output
}
+
+ def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]):
Seq[Endpoint] = {
+ listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
+ }
+
+ private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners:
java.util.List[String]): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
+ def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol],
requireDistinctPorts: Boolean): Seq[Endpoint] = {
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean
= {
+ val inetAddressValidator = InetAddressValidator.getInstance()
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+ }
+
+ def validate(endPoints: Seq[Endpoint]): Unit = {
+ val distinctListenerNames = endPoints.map(_.listener).distinct
+ require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
+
+ val (duplicatePorts, _) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null &&
InetAddressValidator.getInstance().isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach {
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case eps if eps.isEmpty =>
+ case Seq(ep1, ep2) =>
+ if (requireDistinctPorts) {
+ val errorMessage = "If you have two listeners on " +
+ s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port"
+ require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host),
errorMessage)
+
+ // If we reach this point it means that even though
duplicatesWithIpHosts in isolation can be valid, if
+ // there happens to be ANOTHER listener on this port without
an IP host (such as a null host) then its
+ // not valid.
+ if (duplicatesWithoutIpHosts.nonEmpty)
+ throw new IllegalArgumentException(errorMessage)
+ }
+ case _ =>
+ // Having more than 2 duplicate endpoints doesn't make sense
since we only have 2 IP stacks (one is IPv4
+ // and the other is IPv6)
+ if (requireDistinctPorts)
+ throw new IllegalArgumentException("Each listener must have a
different port unless exactly one listener has " +
+ s"an IPv4 address and the other IPv6 address, listeners:
$listeners, port: $port")
+ }
+ }
+ }
+
+ val endPoints = try {
+ SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap).asScala
+ } catch {
+ case e: Exception =>
+ throw new IllegalArgumentException(s"Error creating broker listeners
from '$listeners': ${e.getMessage}", e)
+ }
+ validate(endPoints)
+ endPoints
+ }
}
/**
@@ -444,7 +520,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
def listeners: Seq[Endpoint] =
-
CoreUtils.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap)
+
KafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap)
def controllerListeners: Seq[Endpoint] =
listeners.filter(l => controllerListenerNames.contains(l.listener))
@@ -461,7 +537,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
val advertisedListenersProp =
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
- CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+ KafkaConfig.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
.filter(l => controllerListenerNames.contains(l.listener))
} else {
Seq.empty
@@ -491,7 +567,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
// Use advertised listeners if defined, fallback to listeners otherwise
val advertisedListenersProp =
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val advertisedListeners = if (advertisedListenersProp != null) {
- CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+ KafkaConfig.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
} else {
listeners
}
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index e3497a6ff88..270b1f30a6e 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -18,10 +18,10 @@ package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
-import kafka.utils.{CoreUtils, Logging, Mx4jLoader}
+import kafka.utils.{Logging, Mx4jLoader}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.utils.{AppInfoParser, Time}
+import org.apache.kafka.common.utils.{AppInfoParser, Time, Utils}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory,
BootstrapMetadata}
@@ -103,7 +103,7 @@ class KafkaRaftServer(
// stops the raft client early on, which would disrupt broker shutdown.
broker.foreach(_.shutdown())
controller.foreach(_.shutdown())
- CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix,
config.brokerId.toString, metrics), this)
+ Utils.swallow(this.logger.underlying, () =>
AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString,
metrics))
}
override def awaitShutdown(): Unit = {
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index 3acfc9bf0b9..a20745f35d8 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -20,7 +20,7 @@ package kafka.server
import kafka.metrics.KafkaMetricsReporter
import kafka.raft.KafkaRaftManager
import kafka.server.Server.MetricsPrefix
-import kafka.utils.{CoreUtils, Logging, VerifiableProperties}
+import kafka.utils.{Logging, VerifiableProperties}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
@@ -365,7 +365,7 @@ class SharedServer(
// Ideally, this would just resign our leadership, if we had it. But we
don't have an API in
// RaftManager for that yet, so shut down the RaftManager.
Option(raftManager).foreach(_raftManager => {
- CoreUtils.swallow(_raftManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => _raftManager.shutdown())
raftManager = null
})
}
@@ -376,10 +376,10 @@ class SharedServer(
} else {
info("Stopping SharedServer")
if (loader != null) {
- CoreUtils.swallow(loader.beginShutdown(), this)
+ Utils.swallow(this.logger.underlying, () => loader.beginShutdown())
}
if (snapshotGenerator != null) {
- CoreUtils.swallow(snapshotGenerator.beginShutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
snapshotGenerator.beginShutdown())
}
Utils.closeQuietly(loader, "loader")
loader = null
@@ -388,7 +388,7 @@ class SharedServer(
Utils.closeQuietly(snapshotGenerator, "snapshot generator")
snapshotGenerator = null
if (raftManager != null) {
- CoreUtils.swallow(raftManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => raftManager.shutdown())
raftManager = null
}
Utils.closeQuietly(controllerServerMetrics, "controller server metrics")
@@ -399,7 +399,7 @@ class SharedServer(
nodeMetrics = null
Utils.closeQuietly(metrics, "metrics")
metrics = null
- CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix,
sharedServerConfig.nodeId.toString, metrics), this)
+ Utils.swallow(this.logger.underlying, () =>
AppInfoParser.unregisterAppInfo(MetricsPrefix,
sharedServerConfig.nodeId.toString, metrics))
started = false
}
}
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 3f12d5c009e..8c3f48ef27b 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -24,7 +24,7 @@ import joptsimple.{OptionException, OptionSpec}
import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool,
KafkaRequestHandlerPoolFactory}
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
@@ -143,13 +143,13 @@ class TestRaftServer(
def shutdown(): Unit = {
if (raftManager != null)
- CoreUtils.swallow(raftManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => raftManager.shutdown())
if (workloadGenerator != null)
- CoreUtils.swallow(workloadGenerator.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => workloadGenerator.shutdown())
if (dataPlaneRequestHandlerPool != null)
- CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
dataPlaneRequestHandlerPool.shutdown())
if (socketServer != null)
- CoreUtils.swallow(socketServer.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => socketServer.shutdown())
Utils.closeQuietly(metrics, "metrics")
shutdownLatch.countDown()
}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
deleted file mode 100755
index 66f9bd48657..00000000000
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.io.File
-import java.util.concurrent.locks.{Lock, ReadWriteLock}
-import java.lang.management.ManagementFactory
-import com.typesafe.scalalogging.Logger
-
-import javax.management.ObjectName
-import scala.collection.Seq
-import org.apache.commons.validator.routines.InetAddressValidator
-import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.network.SocketServerConfigs
-import org.slf4j.event.Level
-
-import scala.jdk.CollectionConverters._
-
-/**
- * General helper functions!
- *
- * This is for general helper functions that aren't specific to Kafka logic.
Things that should have been included in
- * the standard library etc.
- *
- * If you are making a new helper function and want to add it to this class
please ensure the following:
- * 1. It has documentation
- * 2. It is the most general possible utility, not just the thing you needed
in one particular place
- * 3. You have tests for it if it is nontrivial in any way
- */
-object CoreUtils {
- private val logger = Logger(getClass)
-
- private val inetAddressValidator = InetAddressValidator.getInstance()
-
- /**
- * Do the given action and log any exceptions thrown without rethrowing
them.
- *
- * @param action The action to execute.
- * @param logging The logging instance to use for logging the thrown
exception.
- * @param logLevel The log level to use for logging.
- */
- @noinline // inlining this method is not typically useful and it triggers
spurious spotbugs warnings
- def swallow(action: => Unit, logging: Logging, logLevel: Level =
Level.WARN): Unit = {
- try {
- action
- } catch {
- case e: Throwable => logLevel match {
- case Level.ERROR => logging.error(e.getMessage, e)
- case Level.WARN => logging.warn(e.getMessage, e)
- case Level.INFO => logging.info(e.getMessage, e)
- case Level.DEBUG => logging.debug(e.getMessage, e)
- case Level.TRACE => logging.trace(e.getMessage, e)
- }
- }
- }
-
- /**
- * Recursively delete the list of files/directories and any subfiles (if any
exist)
- * @param files list of files to be deleted
- */
- def delete(files: java.util.List[String]): Unit = files.forEach(f =>
Utils.delete(new File(f)))
-
- /**
- * Register the given mbean with the platform mbean server,
- * unregistering any mbean that was there before. Note,
- * this method will not throw an exception if the registration
- * fails (since there is nothing you can do and it isn't fatal),
- * instead it just returns false indicating the registration failed.
- * @param mbean The object to register as an mbean
- * @param name The name to register this mbean with
- * @return true if the registration succeeded
- */
- def registerMBean(mbean: Object, name: String): Boolean = {
- try {
- val mbs = ManagementFactory.getPlatformMBeanServer
- mbs synchronized {
- val objName = new ObjectName(name)
- if (mbs.isRegistered(objName))
- mbs.unregisterMBean(objName)
- mbs.registerMBean(mbean, objName)
- true
- }
- } catch {
- case e: Exception =>
- logger.error(s"Failed to register Mbean $name", e)
- false
- }
- }
-
- /**
- * Execute the given function inside the lock
- */
- def inLock[T](lock: Lock)(fun: => T): T = {
- lock.lock()
- try {
- fun
- } finally {
- lock.unlock()
- }
- }
-
- def inReadLock[T](lock: ReadWriteLock)(fun: => T): T =
inLock[T](lock.readLock)(fun)
-
- def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T =
inLock[T](lock.writeLock)(fun)
-
- def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]):
Seq[Endpoint] = {
- listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
- }
-
- private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners:
java.util.List[String]): Unit = {
- val distinctPorts = endpoints.map(_.port).distinct
- require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
- }
-
- def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol],
requireDistinctPorts: Boolean): Seq[Endpoint] = {
- def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
- (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
- (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
-
- def validate(endPoints: Seq[Endpoint]): Unit = {
- val distinctListenerNames = endPoints.map(_.listener).distinct
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
-
- val (duplicatePorts, _) = endPoints.filter {
- // filter port 0 for unit tests
- ep => ep.port != 0
- }.groupBy(_.port).partition {
- case (_, endpoints) => endpoints.size > 1
- }
-
- // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
- val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
- case (port, eps) =>
- (port, eps.partition(ep =>
- ep.host != null && inetAddressValidator.isValid(ep.host)
- ))
- }
-
- // Iterate through every grouping of duplicates by port to see if they
are valid
- duplicatePortsPartitionedByValidIps.foreach {
- case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
- if (requireDistinctPorts)
- checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
-
- duplicatesWithIpHosts match {
- case eps if eps.isEmpty =>
- case Seq(ep1, ep2) =>
- if (requireDistinctPorts) {
- val errorMessage = "If you have two listeners on " +
- s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port"
- require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host),
errorMessage)
-
- // If we reach this point it means that even though
duplicatesWithIpHosts in isolation can be valid, if
- // there happens to be ANOTHER listener on this port without
an IP host (such as a null host) then its
- // not valid.
- if (duplicatesWithoutIpHosts.nonEmpty)
- throw new IllegalArgumentException(errorMessage)
- }
- case _ =>
- // Having more than 2 duplicate endpoints doesn't make sense
since we only have 2 IP stacks (one is IPv4
- // and the other is IPv6)
- if (requireDistinctPorts)
- throw new IllegalArgumentException("Each listener must have a
different port unless exactly one listener has " +
- s"an IPv4 address and the other IPv6 address, listeners:
$listeners, port: $port")
- }
- }
- }
-
- val endPoints = try {
- SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap).asScala
- } catch {
- case e: Exception =>
- throw new IllegalArgumentException(s"Error creating broker listeners
from '$listeners': ${e.getMessage}", e)
- }
- validate(endPoints)
- endPoints
- }
-}
diff --git a/core/src/main/scala/kafka/utils/Logging.scala
b/core/src/main/scala/kafka/utils/Logging.scala
index e08a6873fc1..cc51cdcc422 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -18,6 +18,7 @@
package kafka.utils
import com.typesafe.scalalogging.Logger
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.logger.LoggingController
import org.slf4j.{LoggerFactory, Marker, MarkerFactory}
@@ -29,7 +30,7 @@ object Log4jControllerRegistration {
private def registerMBean(mbean: LoggingController, typeAttr: String): Unit
= {
try {
- CoreUtils.registerMBean(mbean, s"kafka:type=$typeAttr")
+ Utils.registerMBean(mbean, s"kafka:type=$typeAttr")
logger.info("Registered `kafka:type={}` MBean", typeAttr)
} catch {
case e: Exception => logger.warn("Couldn't register `kafka:type={}`
MBean", typeAttr, e)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 53bda6a67c9..d3821ee0eee 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -23,7 +23,7 @@ import java.util
import java.util.{Locale, Optional, OptionalInt, Properties, stream}
import java.util.concurrent.{CompletableFuture, TimeUnit}
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{Exit, Time}
+import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{DirectoryId, Uuid}
import
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion}
@@ -119,15 +119,15 @@ class KRaftQuorumImplementation(
broker
} catch {
case e: Throwable => {
- if (broker != null) CoreUtils.swallow(broker.shutdown(), log)
- CoreUtils.swallow(sharedServer.stopForBroker(), log)
+ if (broker != null) Utils.swallow(() => broker.shutdown())
+ Utils.swallow(() => sharedServer.stopForBroker())
throw e
}
}
}
override def shutdown(): Unit = {
- CoreUtils.swallow(controllerServer.shutdown(), log)
+ Utils.swallow(() => controllerServer.shutdown())
}
}
@@ -229,7 +229,7 @@ abstract class QuorumTestHarness extends Logging {
def shutdownKRaftController(): Unit = {
// Note that the RaftManager instance is left running; it will be shut
down in tearDown()
val kRaftQuorumImplementation = asKRaft()
- CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(),
kRaftQuorumImplementation.log)
+ Utils.swallow(this.logger.underlying, () =>
kRaftQuorumImplementation.controllerServer.shutdown())
}
def addFormatterSettings(formatter: Formatter): Unit = {}
@@ -327,8 +327,8 @@ abstract class QuorumTestHarness extends Logging {
controllerServer.startup()
} catch {
case e: Throwable =>
- if (controllerServer != null)
CoreUtils.swallow(controllerServer.shutdown(), this)
- CoreUtils.swallow(sharedServer.stopForController(), this)
+ if (controllerServer != null) Utils.swallow(this.logger.underlying, ()
=> controllerServer.shutdown())
+ Utils.swallow(this.logger.underlying, () =>
sharedServer.stopForController())
throw e
}
new KRaftQuorumImplementation(
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 9eeed9b86f5..5026e7364fb 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{TopicIdPartition, Uuid}
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
OffsetAndEpoch}
@@ -122,8 +123,8 @@ class LocalLeaderEndPointTest extends Logging {
@AfterEach
def tearDown(): Unit = {
- CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this)
- CoreUtils.swallow(quotaManager.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () =>
replicaManager.shutdown(checkpointHW = false))
+ Utils.swallow(this.logger.underlying, () => quotaManager.shutdown())
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 42cebdcb300..61c8615d33d 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -69,11 +69,11 @@ abstract class AbstractCoordinatorConcurrencyTest[M <:
CoordinatorMember] extend
@AfterEach
def tearDown(): Unit = {
- CoreUtils.swallow(replicaManager.shutdown(false), this)
- CoreUtils.swallow(executor.shutdownNow(), this)
+ Utils.swallow(this.logger.underlying, () => replicaManager.shutdown(false))
+ Utils.swallow(this.logger.underlying, () => executor.shutdownNow())
Utils.closeQuietly(timer, "mock timer")
- CoreUtils.swallow(scheduler.shutdown(), this)
- CoreUtils.swallow(time.scheduler.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => scheduler.shutdown())
+ Utils.swallow(this.logger.underlying, () => time.scheduler.shutdown())
}
/**
diff --git
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 33e85e30a7a..11271d17c93 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -24,7 +24,7 @@ import scala.util.Random
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -42,10 +42,13 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import com.yammer.metrics.core.Meter
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.LeaderConstants
import org.apache.kafka.server.common.MetadataVersion
import org.apache.logging.log4j.core.config.Configurator
+import java.io.File
+
class UncleanLeaderElectionTest extends QuorumTestHarness {
val brokerId1 = 0
val brokerId2 = 1
@@ -89,7 +92,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
@AfterEach
override def tearDown(): Unit = {
brokers.foreach(broker => shutdownBroker(broker))
- brokers.foreach(broker => CoreUtils.delete(broker.config.logDirs))
+ brokers.foreach(broker => broker.config.logDirs.forEach(f =>
Utils.delete(new File(f))))
// restore log levels
Configurator.setLevel(kafkaApisLogger.getName, Level.ERROR)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 318a0c66140..d142a4e64de 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -18,7 +18,7 @@
package kafka.log
import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@@ -69,7 +69,7 @@ class LogCleanerTest extends Logging {
@AfterEach
def teardown(): Unit = {
- CoreUtils.swallow(time.scheduler.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => time.scheduler.shutdown())
Utils.delete(tmpdir)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 34472d4ca68..1a58f52ea8d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -22,7 +22,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinat
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.share.SharePartitionManager
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.consumer.AcknowledgeType
@@ -157,9 +157,9 @@ class KafkaApisTest extends Logging {
@AfterEach
def tearDown(): Unit = {
- CoreUtils.swallow(quotas.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => quotas.shutdown())
if (kafkaApis != null)
- CoreUtils.swallow(kafkaApis.close(), this)
+ Utils.swallow(this.logger.underlying, () => kafkaApis.close())
TestUtils.clearYammerMetrics()
metrics.close()
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2e366a97117..2b1830e136d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
import java.util
import java.util.{Arrays, Collections, Properties}
import kafka.utils.TestUtils.assertBadConfigContainingMessage
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.common.{Endpoint, Node}
import org.apache.kafka.common.config.{AbstractConfig, ConfigException,
SaslConfigs, SecurityConfig, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
@@ -604,7 +604,7 @@ class KafkaConfigTest {
private def listenerListToEndPoints(listenerList: java.util.List[String],
securityProtocolMap: util.Map[ListenerName,
SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
- CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
+ KafkaConfig.listenerListToEndPoints(listenerList, securityProtocolMap)
@Test
def testListenerDefaults(): Unit = {
diff --git
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index c8692661134..e4bcb3d49a5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -18,14 +18,17 @@ package kafka.server
import java.util
import java.util.concurrent.atomic.AtomicReference
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext,
MetricsReporter}
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
+import java.io.File
+
object KafkaMetricsReporterTest {
val setupError = new AtomicReference[String]("")
@@ -89,7 +92,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@AfterEach
override def tearDown(): Unit = {
broker.shutdown()
- CoreUtils.delete(config.logDirs)
+ config.logDirs().forEach(f => Utils.delete(new File(f)))
super.tearDown()
}
}
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index cbc6df7180e..45e7cc2e18d 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,7 +22,7 @@ import java.util.Collections
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.api.IntegrationTestHarness
import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll,
waitUntilTrue}
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
@@ -220,7 +220,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
// Make log directory of the partition on the leader broker inaccessible
by replacing it with a file
val localLog = leaderBroker.replicaManager.localLogOrException(partition)
val logDir = localLog.dir.getParentFile
- CoreUtils.swallow(Utils.delete(logDir), this)
+ Utils.swallow(this.logger.underlying, () => Utils.delete(logDir))
Files.createFile(logDir.toPath)
assertTrue(logDir.isFile)
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0c5010d7de9..08d2eb4299d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{CompletableFuture, Executors,
LinkedBlockingQueue,
import java.util.{Optional, Properties}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.TestUtils.waitUntilTrue
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common
import org.apache.kafka.common.metadata.{FeatureLevelRecord,
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
@@ -68,14 +68,14 @@ class ReplicaManagerConcurrencyTest extends Logging {
@AfterEach
def cleanup(): Unit = {
- CoreUtils.swallow(tasks.foreach(_.shutdown()), this)
- CoreUtils.swallow(executor.shutdownNow(), this)
- CoreUtils.swallow(executor.awaitTermination(5, TimeUnit.SECONDS), this)
- CoreUtils.swallow(channel.shutdown(), this)
- CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this)
- CoreUtils.swallow(quotaManagers.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => tasks.foreach(_.shutdown()))
+ Utils.swallow(this.logger.underlying, () => executor.shutdownNow())
+ Utils.swallow(this.logger.underlying, () => executor.awaitTermination(5,
TimeUnit.SECONDS))
+ Utils.swallow(this.logger.underlying, () => channel.shutdown())
+ Utils.swallow(this.logger.underlying, () =>
replicaManager.shutdown(checkpointHW = false))
+ Utils.swallow(this.logger.underlying, () => quotaManagers.shutdown())
Utils.closeQuietly(metrics, "metrics")
- CoreUtils.swallow(time.scheduler.shutdown(), this)
+ Utils.swallow(this.logger.underlying, () => time.scheduler.shutdown())
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index c8a137a4db7..0f3125685ad 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.server
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
import java.io.File
import java.util.concurrent.CancellationException
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer,
IntegerSerializer, StringDeserializer, StringSerializer}
-import org.apache.kafka.common.utils.Exit
+import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.raft.KRaftConfigs
import org.apache.kafka.server.config.ServerLogConfigs
@@ -177,7 +177,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
def testShutdownWithKRaftControllerUnavailable(): Unit = {
shutdownKRaftController()
killBroker(0, Duration.ofSeconds(1))
- CoreUtils.delete(broker.config.logDirs)
+ broker.config.logDirs.forEach(f => Utils.delete(new File(f)))
verifyNonDaemonThreadsStatus()
}
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
deleted file mode 100755
index 73a2403870f..00000000000
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.locks.ReentrantLock
-import java.util.regex.Pattern
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import kafka.utils.CoreUtils.inLock
-import org.apache.kafka.common.KafkaException
-import org.slf4j.event.Level
-
-
-class CoreUtilsTest extends Logging {
-
- val clusterIdPattern: Pattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
-
- @Test
- def testSwallow(): Unit = {
- var loggedMessage: Option[String] = None
- val testLogging: Logging = new Logging {
- override def info(msg: => String, e: => Throwable): Unit = {
- loggedMessage = Some(msg+Level.INFO)
- }
- override def debug(msg: => String, e: => Throwable): Unit = {
- loggedMessage = Some(msg+Level.DEBUG)
- }
- override def warn(msg: => String, e: => Throwable): Unit = {
- loggedMessage = Some(msg+Level.WARN)
- }
- override def error(msg: => String, e: => Throwable): Unit = {
- loggedMessage = Some(msg+Level.ERROR)
- }
- override def trace(msg: => String, e: => Throwable): Unit = {
- loggedMessage = Some(msg+Level.TRACE)
- }
- }
-
- CoreUtils.swallow(throw new KafkaException("test"), testLogging,
Level.TRACE)
- assertEquals(Some("test"+Level.TRACE), loggedMessage)
- CoreUtils.swallow(throw new KafkaException("test"), testLogging,
Level.DEBUG)
- assertEquals(Some("test"+Level.DEBUG), loggedMessage)
- CoreUtils.swallow(throw new KafkaException("test"), testLogging,
Level.INFO)
- assertEquals(Some("test"+Level.INFO), loggedMessage)
- CoreUtils.swallow(throw new KafkaException("test"), testLogging,
Level.WARN)
- assertEquals(Some("test"+Level.WARN),loggedMessage)
- CoreUtils.swallow(throw new KafkaException("test"), testLogging,
Level.ERROR)
- assertEquals(Some("test"+Level.ERROR),loggedMessage)
- }
-
- @Test
- def testInLock(): Unit = {
- val lock = new ReentrantLock()
- val result = inLock(lock) {
- assertTrue(lock.isHeldByCurrentThread, "Should be in lock")
- 1 + 1
- }
- assertEquals(2, result)
- assertFalse(lock.isLocked, "Should be unlocked")
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e53e10d80d1..e08bbe0be71 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -195,7 +196,7 @@ object TestUtils extends Logging {
val future = Future.traverse(brokers) { s =>
Future {
s.shutdown()
- if (deleteLogDirs) CoreUtils.delete(s.config.logDirs)
+ if (deleteLogDirs) s.config.logDirs.forEach(f => Utils.delete(new
File(f)))
}
}
Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
index 86338726d5e..16bd77d944a 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.util;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
/**
* A utility class providing helper methods for working with {@link Lock}
objects.
@@ -59,6 +60,22 @@ public class LockUtils {
}
}
+ /**
+ * Executes the given {@link ThrowingSupplier} within the context of the
read lock of the specified {@link ReadWriteLock}.
+ * see {@link LockUtils#inLock(Lock, ThrowingSupplier)}
+ */
+ public static <T, E extends Exception> T inReadLock(ReadWriteLock lock,
ThrowingSupplier<T, E> supplier) throws E {
+ return inLock(lock.readLock(), supplier);
+ }
+
+ /**
+ * Executes the given {@link ThrowingSupplier} within the context of the
write lock of the specified {@link ReadWriteLock}.
+ * see {@link LockUtils#inLock(Lock, ThrowingSupplier)}
+ */
+ public static <T, E extends Exception> T inWriteLock(ReadWriteLock lock,
ThrowingSupplier<T, E> supplier) throws E {
+ return inLock(lock.writeLock(), supplier);
+ }
+
/**
* Executes the given {@link ThrowingRunnable} within the context of the
specified {@link Lock}.
* The lock is acquired before executing the runnable and released after
the execution,
@@ -81,4 +98,20 @@ public class LockUtils {
lock.unlock();
}
}
+
+ /**
+ * Executes the given {@link ThrowingRunnable} within the context of the
read lock of the specified {@link ReadWriteLock}.
+ * see {@link LockUtils#inLock(Lock, ThrowingRunnable)}
+ */
+ public static <E extends Exception> void inReadLock(ReadWriteLock lock,
ThrowingRunnable<E> runnable) throws E {
+ inLock(lock.readLock(), runnable);
+ }
+
+ /**
+ * Executes the given {@link ThrowingRunnable} within the context of the
write lock of the specified {@link ReadWriteLock}.
+ * see {@link LockUtils#inLock(Lock, ThrowingRunnable)}
+ */
+ public static <E extends Exception> void inWriteLock(ReadWriteLock lock,
ThrowingRunnable<E> runnable) throws E {
+ inLock(lock.writeLock(), runnable);
+ }
}