This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 718202dbf4b KAFKA-15853: Delete CoreUtils.scala and migrate logic to 
Java (#21289)
718202dbf4b is described below

commit 718202dbf4b5dde65365051566a2d209da83f248
Author: Uladzislau Blok <[email protected]>
AuthorDate: Wed Jan 21 15:48:18 2026 +0100

    KAFKA-15853: Delete CoreUtils.scala and migrate logic to Java (#21289)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  60 ++++++-
 core/src/main/scala/kafka/cluster/Partition.scala  | 122 ++++++-------
 .../transaction/TransactionStateManager.scala      |  59 +++----
 core/src/main/scala/kafka/log/LogManager.scala     |   8 +-
 .../scala/kafka/metrics/KafkaMetricsReporter.scala |   4 +-
 .../main/scala/kafka/network/SocketServer.scala    |  10 +-
 .../main/scala/kafka/raft/KafkaRaftManager.scala   |   9 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |  59 +++----
 .../src/main/scala/kafka/server/BrokerServer.scala |  43 +++--
 .../main/scala/kafka/server/ControllerServer.scala |  16 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  35 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  84 ++++++++-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   6 +-
 .../src/main/scala/kafka/server/SharedServer.scala |  12 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |  10 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    | 196 ---------------------
 core/src/main/scala/kafka/utils/Logging.scala      |   3 +-
 .../kafka/server/QuorumTestHarness.scala           |  16 +-
 .../kafka/server/LocalLeaderEndPointTest.scala     |   7 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |   8 +-
 .../integration/UncleanLeaderElectionTest.scala    |   7 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   6 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   4 +-
 .../kafka/server/KafkaMetricsReporterTest.scala    |   7 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   4 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |  16 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |   6 +-
 .../scala/unit/kafka/utils/CoreUtilsTest.scala     |  76 --------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   3 +-
 .../org/apache/kafka/server/util/LockUtils.java    |  33 ++++
 31 files changed, 410 insertions(+), 523 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8162b75b4d4..51cb86f5c88 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -35,6 +35,7 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
+import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
@@ -93,6 +94,9 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 public final class Utils {
 
     private Utils() {}
@@ -1035,14 +1039,26 @@ public final class Utils {
         void run() throws Throwable;
     }
 
-    public static void swallow(final Logger log, final Level level, final 
String what, final SwallowAction code) {
-        swallow(log, level, what, code, null);
+    public static void swallow(final SwallowAction code) {
+        swallow(log, Level.WARN, "Exception while execution the action", code, 
null);
+    }
+
+    public static void swallow(final Logger log, final SwallowAction code) {
+        swallow(log, Level.WARN, "Exception while execution the action", code, 
null);
+    }
+
+    public static void swallow(final Logger log, final Level level, final 
SwallowAction code) {
+        swallow(log, level, "Exception while execution the action", code, 
null);
+    }
+
+    public static void swallow(final Logger log, final Level level, final 
String errorMessage, final SwallowAction code) {
+        swallow(log, level, errorMessage, code, null);
     }
 
     /**
      * Run the supplied code. If an exception is thrown, it is swallowed and 
registered to the firstException parameter.
      */
-    public static void swallow(final Logger log, final Level level, final 
String what, final SwallowAction code,
+    public static void swallow(final Logger log, final Level level, final 
String errorMessage, final SwallowAction code,
                                final AtomicReference<Throwable> 
firstException) {
         if (code != null) {
             try {
@@ -1050,20 +1066,20 @@ public final class Utils {
             } catch (Throwable t) {
                 switch (level) {
                     case INFO:
-                        log.info(what, t);
+                        log.info(errorMessage, t);
                         break;
                     case DEBUG:
-                        log.debug(what, t);
+                        log.debug(errorMessage, t);
                         break;
                     case ERROR:
-                        log.error(what, t);
+                        log.error(errorMessage, t);
                         break;
                     case TRACE:
-                        log.trace(what, t);
+                        log.trace(errorMessage, t);
                         break;
                     case WARN:
                     default:
-                        log.warn(what, t);
+                        log.warn(errorMessage, t);
                 }
                 if (firstException != null)
                     firstException.compareAndSet(null, t);
@@ -1723,6 +1739,34 @@ public final class Utils {
         return all;
     }
 
+    /**
+     * Register the given mbean with the platform mbean server,
+     * unregistering any mbean that was there before. Note,
+     * this method will not throw an exception if the registration
+     * fails (since there is nothing you can do, and it isn't fatal),
+     * instead it just returns false indicating the registration failed.
+     *
+     * @param mbean The object to register as a mbean
+     * @param name  The name to register this mbean with
+     * @return true if the registration succeeded
+     */
+    public static boolean registerMBean(Object mbean, String name) {
+        try {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            synchronized (mbs) {
+                ObjectName objName = new ObjectName(name);
+                if (mbs.isRegistered(objName)) {
+                    mbs.unregisterMBean(objName);
+                }
+                mbs.registerMBean(mbean, objName);
+                return true;
+            }
+        } catch (Exception e) {
+            log.error("Failed to register Mbean with name {}", name, e);
+            return false;
+        }
+    }
+
     /**
      * A runnable that can throw checked exception.
      */
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 76a4b247edd..718a9feba44 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.{CompletableFuture, 
ConcurrentHashMap, CopyOnWriteAr
 import kafka.log._
 import kafka.server._
 import kafka.server.share.DelayedShareFetch
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
@@ -35,7 +34,7 @@ import 
org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.logger.StateChangeLogger
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, 
MetadataCache, PartitionRegistration}
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
@@ -48,6 +47,7 @@ import 
org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperation
 import org.apache.kafka.server.replica.Replica
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
UnexpectedAppendOffsetException}
+import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.slf4j.event.Level
 
@@ -67,11 +67,11 @@ class DelayedOperations(topicId: Option[Uuid],
 
   def checkAndCompleteAll(): Unit = {
     val requestKey = new TopicPartitionOperationKey(topicPartition)
-    CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), this, 
Level.ERROR)
-    CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), this, 
Level.ERROR)
-    CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), this, 
Level.ERROR)
-    if (topicId.isDefined) CoreUtils.swallow(() -> 
shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(
-      topicId.get, topicPartition.partition())), this, Level.ERROR)
+    Utils.swallow(this.logger.underlying, Level.ERROR, () => 
fetch.checkAndComplete(requestKey))
+    Utils.swallow(this.logger.underlying, Level.ERROR, () => 
produce.checkAndComplete(requestKey))
+    Utils.swallow(this.logger.underlying, Level.ERROR, () => 
deleteRecords.checkAndComplete(requestKey))
+    if (topicId.isDefined) Utils.swallow(this.logger.underlying, Level.ERROR, 
() => shareFetch.checkAndComplete(
+      new DelayedShareFetchPartitionKey(topicId.get, 
topicPartition.partition())))
   }
 
   def numDelayedDelete: Int = deleteRecords.numDelayed()
@@ -266,7 +266,7 @@ class Partition(val topicPartition: TopicPartition,
   def inSyncReplicaIds: Set[Int] = 
partitionState.isr.asScala.map(_.toInt).toSet
 
   def maybeAddListener(listener: PartitionListener): Boolean = {
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock(leaderIsrUpdateLock, () => {
       // `log` is set to `None` when the partition is failed or deleted.
       log match {
         case Some(_) =>
@@ -276,7 +276,7 @@ class Partition(val topicPartition: TopicPartition,
         case None =>
           false
       }
-    }
+    })
   }
 
   def removeListener(listener: PartitionListener): Unit = {
@@ -295,7 +295,7 @@ class Partition(val topicPartition: TopicPartition,
     // The writeLock is needed to make sure that while the caller checks the 
log directory of the
     // current replica and the existence of the future replica, no other 
thread can update the log directory of the
     // current replica or remove the future replica.
-    inWriteLock(leaderIsrUpdateLock) {
+    inWriteLock(leaderIsrUpdateLock, () => {
       val currentLogDir = localLogOrException.parentDir
       if (currentLogDir == logDir) {
         info(s"Current log directory $currentLogDir is same as requested log 
dir $logDir. " +
@@ -314,7 +314,7 @@ class Partition(val topicPartition: TopicPartition,
             true
         }
       }
-    }
+    })
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid],
@@ -462,17 +462,17 @@ class Partition(val topicPartition: TopicPartition,
     remoteReplicasMap.values.asScala
 
   def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock(leaderIsrUpdateLock, () => {
       futureLog.exists(_.parentDir != newDestinationDir)
-    }
+    })
   }
 
   def removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit = {
-    inWriteLock(leaderIsrUpdateLock) {
+    inWriteLock[Exception](leaderIsrUpdateLock, () => {
       futureLog = None
       if (deleteFromLogDir)
         logManager.asyncDelete(topicPartition, isFuture = true)
-    }
+    })
   }
 
   // Returns a VerificationGuard if we need to verify. This starts or 
continues the verification process. Otherwise return the
@@ -504,7 +504,7 @@ class Partition(val topicPartition: TopicPartition,
       if (futureReplicaLEO.contains(localReplicaLEO)) {
         // The write lock is needed to make sure that while 
ReplicaAlterDirThread checks the LEO of the
         // current replica, no other thread can update LEO of the current 
replica via log truncation or log append operation.
-        inWriteLock(leaderIsrUpdateLock) {
+        inWriteLock(leaderIsrUpdateLock, () => {
           futureLog match {
             case Some(futurePartitionLog) =>
               if (log.exists(_.logEndOffset == 
futurePartitionLog.logEndOffset)) {
@@ -518,7 +518,7 @@ class Partition(val topicPartition: TopicPartition,
               // state again to avoid race condition
               false
           }
-        }
+        })
       } else false
     }
   }
@@ -531,13 +531,13 @@ class Partition(val topicPartition: TopicPartition,
    */
   def delete(): Unit = {
     // need to hold the lock to prevent appendMessagesToLeader() from hitting 
I/O exceptions due to log being deleted
-    inWriteLock(leaderIsrUpdateLock) {
+    inWriteLock[Exception](leaderIsrUpdateLock, () => {
       clear()
       listeners.forEach { listener =>
         listener.onDeleted(topicPartition)
       }
       listeners.clear()
-    }
+    })
   }
 
   /**
@@ -545,14 +545,14 @@ class Partition(val topicPartition: TopicPartition,
    * transitions to Offline.
    */
   def markOffline(): Unit = {
-    inWriteLock(leaderIsrUpdateLock) {
+    inWriteLock[Exception](leaderIsrUpdateLock, () => {
       clear()
 
       listeners.forEach { listener =>
         listener.onFailed(topicPartition)
       }
       listeners.clear()
-    }
+    })
   }
 
   /**
@@ -589,7 +589,7 @@ class Partition(val topicPartition: TopicPartition,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid],
                  targetDirectoryId: Option[Uuid] = None): Boolean = {
-    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
+    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock, 
() => {
       // Partition state changes are expected to have a partition epoch larger 
or equal
       // to the current partition epoch. The latter is allowed because the 
partition epoch
       // is also updated by the AlterPartition response so the new epoch might 
be known
@@ -675,7 +675,7 @@ class Partition(val topicPartition: TopicPartition,
 
       // We may need to increment high watermark since ISR could be down to 1.
       (maybeIncrementLeaderHW(leaderLog, currentTimeMs = currentTimeMs), 
isNewLeader)
-    }
+    })
 
     // Some delayed operations may be unblocked after HW changed.
     if (leaderHWIncremented)
@@ -695,7 +695,7 @@ class Partition(val topicPartition: TopicPartition,
                    highWatermarkCheckpoints: OffsetCheckpoints,
                    topicId: Option[Uuid],
                    targetLogDirectoryId: Option[Uuid] = None): Boolean = {
-    inWriteLock(leaderIsrUpdateLock) {
+    inWriteLock(leaderIsrUpdateLock, () => {
       if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId, " +
           s"partition registration $partitionRegistration and isNew=$isNew 
since the follower is already at a newer partition epoch $partitionEpoch.")
@@ -738,7 +738,7 @@ class Partition(val topicPartition: TopicPartition,
       // We must restart the fetchers when the leader epoch changed regardless 
of
       // whether the leader changed as well.
       isNewLeaderEpoch
-    }
+    })
   }
 
   private def createLogInAssignedDirectoryId(isNew: Boolean, 
highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], 
targetLogDirectoryId: Option[Uuid]): Unit = {
@@ -777,7 +777,7 @@ class Partition(val topicPartition: TopicPartition,
 
     // Apply read lock here to avoid the race between ISR updates and the 
fetch requests from rebooted follower. It
     // could break the broker epoch checks in the ISR expansion.
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock[Exception](leaderIsrUpdateLock, () => {
       replica.updateFetchStateOrThrow(
         followerFetchOffsetMetadata,
         followerStartOffset,
@@ -785,7 +785,7 @@ class Partition(val topicPartition: TopicPartition,
         leaderEndOffset,
         brokerEpoch
       )
-    }
+    })
 
     val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
     // check if the LW of the partition has incremented
@@ -800,9 +800,9 @@ class Partition(val topicPartition: TopicPartition,
     val leaderHWIncremented = if (prevFollowerEndOffset != 
replica.stateSnapshot.logEndOffset) {
       // the leader log may be updated by ReplicaAlterLogDirsThread so the 
following method must be in lock of
       // leaderIsrUpdateLock to prevent adding new hw to invalid log.
-      inReadLock(leaderIsrUpdateLock) {
+      inReadLock(leaderIsrUpdateLock, () => {
         leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, 
followerFetchTimeMs))
-      }
+      })
     } else {
       false
     }
@@ -873,11 +873,11 @@ class Partition(val topicPartition: TopicPartition,
    * This function can be triggered when a replica's LEO has incremented.
    */
   private def maybeExpandIsr(followerReplica: Replica): Unit = {
-    val needsIsrUpdate = !partitionState.isInflight && 
canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock) 
{
+    val needsIsrUpdate = !partitionState.isInflight && 
canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock, 
() => {
       needsExpandIsr(followerReplica)
-    }
+    })
     if (needsIsrUpdate) {
-      val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
+      val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock, () => {
         // check if this replica needs to be added to the ISR
         partitionState match {
           case currentState: CommittedPartitionState if 
needsExpandIsr(followerReplica) =>
@@ -885,7 +885,7 @@ class Partition(val topicPartition: TopicPartition,
           case _ =>
             None
         }
-      }
+      })
       // Send the AlterPartition request outside of the LeaderAndIsr lock 
since the completion logic
       // may increment the high watermark (and consequently complete delayed 
operations).
       alterIsrUpdateOpt.foreach(submitAlterPartition)
@@ -1087,13 +1087,13 @@ class Partition(val topicPartition: TopicPartition,
 
   def maybeShrinkIsr(): Unit = {
     def needsIsrUpdate: Boolean = {
-      !partitionState.isInflight && inReadLock(leaderIsrUpdateLock) {
+      !partitionState.isInflight && inReadLock(leaderIsrUpdateLock, () => {
         needsShrinkIsr()
-      }
+      })
     }
 
     if (needsIsrUpdate) {
-      val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
+      val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock, () => {
         leaderLogIfLocal.flatMap { leaderLog =>
           val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
           partitionState match {
@@ -1118,7 +1118,7 @@ class Partition(val topicPartition: TopicPartition,
               None
           }
         }
-      }
+      })
       // Send the AlterPartition request outside of the LeaderAndIsr lock 
since the completion logic
       // may increment the high watermark (and consequently complete delayed 
operations).
       alterIsrUpdateOpt.foreach(submitAlterPartition)
@@ -1171,11 +1171,11 @@ class Partition(val topicPartition: TopicPartition,
     if (isFuture) {
       // The read lock is needed to handle race condition if request handler 
thread tries to
       // remove future replica after receiving AlterReplicaLogDirsRequest.
-      inReadLock(leaderIsrUpdateLock) {
+      inReadLock(leaderIsrUpdateLock, () => {
         // Note the replica may be undefined if it is removed by a 
non-ReplicaAlterLogDirsThread before
         // this method is called
         futureLog.map { _.appendAsFollower(records, partitionLeaderEpoch) }
-      }
+      })
     } else {
       // The lock is needed to prevent the follower replica from being updated 
while ReplicaAlterDirThread
       // is executing maybeReplaceCurrentWithFutureReplica() to replace 
follower replica with the future replica.
@@ -1223,7 +1223,7 @@ class Partition(val topicPartition: TopicPartition,
     verificationGuard: VerificationGuard = VerificationGuard.SENTINEL,
     transactionVersion: Short = TransactionVersion.TV_UNKNOWN
   ): LogAppendInfo = {
-    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
+    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock, () => {
       leaderLogIfLocal match {
         case Some(leaderLog) =>
           val minIsr = effectiveMinIsr(leaderLog)
@@ -1245,7 +1245,7 @@ class Partition(val topicPartition: TopicPartition,
           throw new NotLeaderOrFollowerException("Leader not local for 
partition %s on broker %d"
             .format(topicPartition, localBrokerId))
       }
-    }
+    })
 
     info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else 
LeaderHwChange.SAME)
   }
@@ -1296,7 +1296,7 @@ class Partition(val topicPartition: TopicPartition,
 
     if (fetchParams.isFromFollower) {
       // Check that the request is from a valid replica before doing the read
-      val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
+      val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock, () => {
         val localLog = localLogWithEpochOrThrow(
           fetchPartitionData.currentLeaderEpoch,
           fetchParams.fetchOnlyLeader
@@ -1307,7 +1307,7 @@ class Partition(val topicPartition: TopicPartition,
         )
         val logReadInfo = readFromLocalLog(localLog)
         (replica, logReadInfo)
-      }
+      })
 
       if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
         updateFollowerFetchState(
@@ -1322,13 +1322,13 @@ class Partition(val topicPartition: TopicPartition,
 
       logReadInfo
     } else {
-      inReadLock(leaderIsrUpdateLock) {
+      inReadLock(leaderIsrUpdateLock, () => {
         val localLog = localLogWithEpochOrThrow(
           fetchPartitionData.currentLeaderEpoch,
           fetchParams.fetchOnlyLeader
         )
         readFromLocalLog(localLog)
-      }
+      })
     }
   }
 
@@ -1433,7 +1433,7 @@ class Partition(val topicPartition: TopicPartition,
                               isolationLevel: Option[IsolationLevel],
                               currentLeaderEpoch: Optional[Integer],
                               fetchOnlyFromLeader: Boolean,
-                              remoteLogManager: Option[RemoteLogManager] = 
None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock) {
+                              remoteLogManager: Option[RemoteLogManager] = 
None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock, () => {
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, 
fetchOnlyFromLeader)
 
@@ -1478,7 +1478,7 @@ class Partition(val topicPartition: TopicPartition,
         
offsetResultHolder.lastFetchableOffset(Optional.of(lastFetchableOffset))
         offsetResultHolder
     }
-  }
+  })
 
   def activeProducerState: DescribeProducersResponseData.PartitionResponse = {
     val producerState = new DescribeProducersResponseData.PartitionResponse()
@@ -1498,16 +1498,16 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def fetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
-                          fetchOnlyFromLeader: Boolean): LogOffsetSnapshot = 
inReadLock(leaderIsrUpdateLock) {
+                          fetchOnlyFromLeader: Boolean): LogOffsetSnapshot = 
inReadLock(leaderIsrUpdateLock, () => {
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, 
fetchOnlyFromLeader)
     localLog.fetchOffsetSnapshot
-  }
+  })
 
   def logStartOffset: Long = {
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock[Long, Exception](leaderIsrUpdateLock, () => {
       leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
-    }
+    })
   }
 
   /**
@@ -1516,7 +1516,7 @@ class Partition(val topicPartition: TopicPartition,
    *
    * Return low watermark of the partition.
    */
-  def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = 
inReadLock(leaderIsrUpdateLock) {
+  def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = 
inReadLock(leaderIsrUpdateLock, () => {
     leaderLogIfLocal match {
       case Some(leaderLog) =>
         if (!leaderLog.config.delete && leaderLog.config.compact)
@@ -1537,7 +1537,7 @@ class Partition(val topicPartition: TopicPartition,
       case None =>
         throw new NotLeaderOrFollowerException(s"Leader not local for 
partition $topicPartition on broker $localBrokerId")
     }
-  }
+  })
 
   /**
     * Truncate the local log of this partition to the specified offset and 
checkpoint the recovery point to this offset
@@ -1548,9 +1548,9 @@ class Partition(val topicPartition: TopicPartition,
   def truncateTo(offset: Long, isFuture: Boolean): Unit = {
     // The read lock is needed to prevent the follower replica from being 
truncated while ReplicaAlterDirThread
     // is executing maybeReplaceCurrentWithFutureReplica() to replace follower 
replica with the future replica.
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock[Exception](leaderIsrUpdateLock, () => {
       logManager.truncateTo(Map(topicPartition -> offset), isFuture = isFuture)
-    }
+    })
   }
 
   /**
@@ -1565,9 +1565,9 @@ class Partition(val topicPartition: TopicPartition,
                               logStartOffsetOpt: Optional[JLong] = 
Optional.empty): Unit = {
     // The read lock is needed to prevent the follower replica from being 
truncated while ReplicaAlterDirThread
     // is executing maybeReplaceCurrentWithFutureReplica() to replace follower 
replica with the future replica.
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock[Exception](leaderIsrUpdateLock, () => {
       logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture = 
isFuture, logStartOffsetOpt)
-    }
+    })
   }
 
   /**
@@ -1586,7 +1586,7 @@ class Partition(val topicPartition: TopicPartition,
   def lastOffsetForLeaderEpoch(currentLeaderEpoch: Optional[Integer],
                                leaderEpoch: Int,
                                fetchOnlyFromLeader: Boolean): EpochEndOffset = 
{
-    inReadLock(leaderIsrUpdateLock) {
+    inReadLock(leaderIsrUpdateLock, () => {
       val localLogOrError = getLocalLog(currentLeaderEpoch, 
fetchOnlyFromLeader)
       localLogOrError match {
         case Left(localLog) =>
@@ -1604,7 +1604,7 @@ class Partition(val topicPartition: TopicPartition,
           .setPartition(partitionId)
           .setErrorCode(error.code)
       }
-    }
+    })
   }
 
   private def prepareIsrExpand(
@@ -1695,7 +1695,7 @@ class Partition(val topicPartition: TopicPartition,
       var hwIncremented = false
       var shouldRetry = false
 
-      inWriteLock(leaderIsrUpdateLock) {
+      inWriteLock[Exception](leaderIsrUpdateLock, () => {
         if (partitionState != proposedIsrState) {
           // This means partitionState was updated through leader election or 
some other mechanism
           // before we got the AlterPartition response. We don't know what 
happened on the controller
@@ -1707,7 +1707,7 @@ class Partition(val topicPartition: TopicPartition,
         } else {
           shouldRetry = handleAlterPartitionError(proposedIsrState, 
Errors.forException(e))
         }
-      }
+      })
 
       if (hwIncremented) {
         tryCompleteDelayedRequests()
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 800ac310a13..9f647cd313a 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.server.ReplicaManager
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.Logging
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.internals.Topic
@@ -42,12 +41,12 @@ import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.Scheduler
+import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
 import org.apache.kafka.storage.internals.log.AppendOrigin
 import com.google.re2j.{Pattern, PatternSyntaxException}
 import org.apache.kafka.common.errors.InvalidRegularExpression
 
 import java.util.Optional
-
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
@@ -118,9 +117,9 @@ class TransactionStateManager(brokerId: Int,
   // visible for testing only
   private[transaction] def addLoadingPartition(partitionId: Int, 
coordinatorEpoch: Int): Unit = {
     val partitionAndLeaderEpoch = 
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
-    inWriteLock(stateLock) {
+    inWriteLock(stateLock, () => {
       loadingPartitions.add(partitionAndLeaderEpoch)
-    }
+    })
   }
 
   // this is best-effort expiration of an ongoing transaction which has been 
open for more than its
@@ -129,7 +128,7 @@ class TransactionStateManager(brokerId: Int,
   // metadata to abort later.
   def timedOutTransactions(): Iterable[TransactionalIdAndProducerIdEpoch] = {
     val now = time.milliseconds()
-    inReadLock(stateLock) {
+    inReadLock(stateLock, () => {
       transactionMetadataCache.flatMap { case (_, entry) =>
         entry.metadataPerTransactionalId.asScala.filter { case (_, 
txnMetadata) =>
           if (txnMetadata.pendingTransitionInProgress) {
@@ -147,14 +146,14 @@ class TransactionStateManager(brokerId: Int,
           TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId, 
txnMetadata.producerEpoch)
         }
       }
-    }
+    })
   }
 
   private def removeExpiredTransactionalIds(
     transactionPartition: TopicPartition,
     txnMetadataCacheEntry: TxnMetadataCacheEntry,
   ): Unit = {
-    inReadLock(stateLock) {
+    inReadLock[Exception](stateLock, () => {
       replicaManager.getLogConfig(transactionPartition) match {
         case Some(logConfig) =>
           val currentTimeMs = time.milliseconds()
@@ -219,7 +218,7 @@ class TransactionStateManager(brokerId: Int,
           warn(s"Transaction expiration for partition $transactionPartition 
failed because the log " +
             "config was not available, which likely means the partition is not 
online or is no longer local.")
       }
-    }
+    })
   }
 
   private def shouldExpire(
@@ -245,12 +244,12 @@ class TransactionStateManager(brokerId: Int,
   }
 
   private[transaction] def removeExpiredTransactionalIds(): Unit = {
-    inReadLock(stateLock) {
+    inReadLock[Exception](stateLock, () => {
       transactionMetadataCache.foreachEntry { (partitionId, 
partitionCacheEntry) =>
         val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
         removeExpiredTransactionalIds(transactionPartition, 
partitionCacheEntry)
       }
-    }
+    })
   }
 
   private def writeTombstonesForExpiredTransactionalIds(
@@ -260,7 +259,7 @@ class TransactionStateManager(brokerId: Int,
   ): Unit = {
     def removeFromCacheCallback(responses: collection.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
       responses.foreachEntry { (topicPartition, response) =>
-        inReadLock(stateLock) {
+        inReadLock[Exception](stateLock, () => {
           transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
             expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
               val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
@@ -283,11 +282,11 @@ class TransactionStateManager(brokerId: Int,
               })
             }
           }
-        }
+        })
       }
     }
 
-    inReadLock(stateLock) {
+    inReadLock[Exception](stateLock, () => {
       replicaManager.appendRecords(
         timeout = config.requestTimeoutMs,
         requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
@@ -296,7 +295,7 @@ class TransactionStateManager(brokerId: Int,
         entriesPerPartition = 
Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
         responseCallback = removeFromCacheCallback,
         requestLocal = RequestLocal.noCaching)
-    }
+    })
   }
 
   def enableTransactionalIdExpiration(): Unit = {
@@ -323,7 +322,7 @@ class TransactionStateManager(brokerId: Int,
     filterDurationMs: Long,
     filterTransactionalIdPattern: String
   ): ListTransactionsResponseData = {
-    inReadLock(stateLock) {
+    inReadLock(stateLock, () => {
       val response = new ListTransactionsResponseData()
       if (loadingPartitions.nonEmpty) {
         response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code)
@@ -382,7 +381,7 @@ class TransactionStateManager(brokerId: Int,
         response.setErrorCode(Errors.NONE.code)
           .setTransactionStates(states)
       }
-    }
+    })
   }
 
   /**
@@ -394,7 +393,7 @@ class TransactionStateManager(brokerId: Int,
    */
   private def getAndMaybeAddTransactionState(transactionalId: String,
                                              createdTxnMetadataOpt: 
Option[TransactionMetadata]): Either[Errors, 
Option[CoordinatorEpochAndTxnMetadata]] = {
-    inReadLock(stateLock) {
+    inReadLock(stateLock, () => {
       val partitionId = partitionFor(transactionalId)
       if (loadingPartitions.exists(_.txnPartitionId == partitionId))
         Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
@@ -413,7 +412,7 @@ class TransactionStateManager(brokerId: Int,
             Left(Errors.NOT_COORDINATOR)
         }
       }
-    }
+    })
   }
 
   /**
@@ -465,9 +464,9 @@ class TransactionStateManager(brokerId: Int,
         var readAtLeastOneRecord = true
 
         try {
-          while (currOffset < logEndOffset && readAtLeastOneRecord && 
!shuttingDown.get() && inReadLock(stateLock) {
+          while (currOffset < logEndOffset && readAtLeastOneRecord && 
!shuttingDown.get() && inReadLock(stateLock, () => {
             loadingPartitions.exists { idAndEpoch: 
TransactionPartitionAndLeaderEpoch =>
-              idAndEpoch.txnPartitionId == topicPartition.partition && 
idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
+              idAndEpoch.txnPartitionId == topicPartition.partition && 
idAndEpoch.coordinatorEpoch == coordinatorEpoch}})) {
             val fetchDataInfo = log.read(currOffset, 
config.transactionLogLoadBufferSize, FetchIsolation.LOG_END, true)
 
             readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
@@ -547,9 +546,9 @@ class TransactionStateManager(brokerId: Int,
     val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val partitionAndLeaderEpoch = 
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 
-    inWriteLock(stateLock) {
+    inWriteLock(stateLock, () => {
       loadingPartitions.add(partitionAndLeaderEpoch)
-    }
+    })
 
     def loadTransactions(startTimeMs: java.lang.Long): Unit = {
       val schedulerTimeMs = time.milliseconds() - startTimeMs
@@ -563,7 +562,7 @@ class TransactionStateManager(brokerId: Int,
       info(s"Finished loading ${loadedTransactions.size} transaction metadata 
from $topicPartition in " +
         s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs 
milliseconds was spent in the scheduler.")
 
-      inWriteLock(stateLock) {
+      inWriteLock[Exception](stateLock, () => {
         if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
           addLoadedTransactionsToCache(topicPartition.partition, 
coordinatorEpoch, loadedTransactions)
 
@@ -595,7 +594,7 @@ class TransactionStateManager(brokerId: Int,
               txnTransitMetadata.txnMetadata, 
txnTransitMetadata.transitMetadata)
           }
         }
-      }
+      })
 
       info(s"Completed loading transaction metadata from $topicPartition for 
coordinator epoch $coordinatorEpoch")
     }
@@ -606,13 +605,13 @@ class TransactionStateManager(brokerId: Int,
 
   def removeTransactionsForTxnTopicPartition(partitionId: Int): Unit = {
     val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
-    inWriteLock(stateLock) {
+    inWriteLock[Exception](stateLock, () => {
       loadingPartitions --= loadingPartitions.filter(_.txnPartitionId == 
partitionId)
       transactionMetadataCache.remove(partitionId).foreach { 
txnMetadataCacheEntry =>
         info(s"Unloaded transaction metadata $txnMetadataCacheEntry for 
$topicPartition following " +
           s"local partition deletion")
       }
-    }
+    })
   }
 
   /**
@@ -622,7 +621,7 @@ class TransactionStateManager(brokerId: Int,
   def removeTransactionsForTxnTopicPartition(partitionId: Int, 
coordinatorEpoch: Int): Unit = {
     val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
 
-    inWriteLock(stateLock) {
+    inWriteLock[Exception](stateLock, () => {
       removeLoadingPartitionWithEpoch(partitionId, coordinatorEpoch)
       transactionMetadataCache.remove(partitionId) match {
         case Some(txnMetadataCacheEntry) =>
@@ -631,7 +630,7 @@ class TransactionStateManager(brokerId: Int,
         case None =>
           info(s"No cached transaction metadata found for $topicPartition 
during become-follower transition")
       }
-    }
+    })
   }
 
   /**
@@ -777,7 +776,7 @@ class TransactionStateManager(brokerId: Int,
       responseCallback(responseError)
     }
 
-    inReadLock(stateLock) {
+    inReadLock[Exception](stateLock, () => {
       // we need to hold the read lock on the transaction metadata cache until 
appending to local log returns;
       // this is to avoid the case where an emigration followed by an 
immigration could have completed after the check
       // returns and before appendRecords() is called, since otherwise entries 
with a high coordinator epoch could have
@@ -818,7 +817,7 @@ class TransactionStateManager(brokerId: Int,
             trace(s"Appending new metadata $newMetadata for transaction id 
$transactionalId with coordinator epoch $coordinatorEpoch to the local 
transaction log")
           }
       }
-    }
+    })
   }
 
   def startup(retrieveTransactionTopicPartitionCount: () => Int, 
enableTransactionalIdExpiration: Boolean): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index bfee35061f8..5ea048cf4d7 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.utils.threadsafe
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
 import org.apache.kafka.common.errors.{InconsistentTopicIdException, 
KafkaStorageException, LogDirNotFoundException}
@@ -253,7 +253,7 @@ class LogManager(logDirs: Seq[File],
 
       warn(s"Logs for partitions 
${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
            s"logs for future partitions 
${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log 
directory $dir")
-      dirLocks.filter(_.file.getParent == dir).foreach(dir => 
CoreUtils.swallow(dir.destroy(), this))
+      dirLocks.filter(_.file.getParent == dir).foreach(dir => 
Utils.swallow(this.logger.underlying, () => dir.destroy()))
     }
   }
 
@@ -656,7 +656,7 @@ class LogManager(logDirs: Seq[File],
 
     // stop the cleaner first
     if (cleaner != null) {
-      CoreUtils.swallow(cleaner.shutdown(), this)
+      Utils.swallow(this.logger.underlying, () => cleaner.shutdown())
     }
 
     val localLogsByDir = logsByDir
@@ -704,7 +704,7 @@ class LogManager(logDirs: Seq[File],
               loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
             val cleanShutdownFileHandler = new 
CleanShutdownFileHandler(dir.getPath)
             debug(s"Writing clean shutdown marker at $dir with broker 
epoch=$brokerEpoch")
-            CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch), 
this)
+            Utils.swallow(this.logger.underlying, () => 
cleanShutdownFileHandler.write(brokerEpoch))
           }
         }
       }
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index eb6bae3ced6..363fab42036 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -20,7 +20,7 @@
 
 package kafka.metrics
 
-import kafka.utils.{CoreUtils, VerifiableProperties}
+import kafka.utils.VerifiableProperties
 import org.apache.kafka.common.utils.Utils
 
 import java.util.concurrent.atomic.AtomicBoolean
@@ -67,7 +67,7 @@ object KafkaMetricsReporter {
             reporter.init(verifiableProps)
             reporters += reporter
             reporter match {
-              case bean: KafkaMetricsReporterMBean => 
CoreUtils.registerMBean(reporter, bean.getMBeanName)
+              case bean: KafkaMetricsReporterMBean => 
Utils.registerMBean(reporter, bean.getMBeanName)
               case _ =>
             }
           })
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 306b633f6fa..b9de13d637f 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -929,7 +929,7 @@ private[kafka] class Processor(
       }
     } finally {
       debug(s"Closing selector - processor $id")
-      CoreUtils.swallow(closeAll(), this, Level.ERROR)
+      Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
     }
   }
 
@@ -1112,7 +1112,8 @@ private[kafka] class Processor(
         // the channel has been closed by the selector but the quotas still 
need to be updated
         connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
         // Call listeners to notify for closed connection.
-        connectionDisconnectListeners.foreach(listener => CoreUtils.swallow(() 
-> listener.onDisconnect(connectionId), this, Level.ERROR))
+        connectionDisconnectListeners.foreach(listener => 
Utils.swallow(this.logger.underlying, Level.ERROR, () => {
+          () -> listener.onDisconnect(connectionId)}))
       } catch {
         case e: Throwable => processException(s"Exception while processing 
disconnection of $connectionId", e)
       }
@@ -1142,7 +1143,8 @@ private[kafka] class Processor(
         connectionQuotas.dec(listenerName, address)
       selector.close(connectionId)
       // Call listeners to notify for closed connection.
-      connectionDisconnectListeners.foreach(listener => CoreUtils.swallow(() 
-> listener.onDisconnect(connectionId), this, Level.ERROR))
+      connectionDisconnectListeners.foreach(listener => 
Utils.swallow(this.logger.underlying, Level.ERROR, () => {
+        () -> listener.onDisconnect(connectionId)}))
 
       inflightResponses.remove(connectionId).foreach(response => 
updateRequestMetrics(response))
     }
@@ -1273,7 +1275,7 @@ private[kafka] class Processor(
       beginShutdown()
       thread.join()
       if (!started.get) {
-        CoreUtils.swallow(closeAll(), this, Level.ERROR)
+        Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
       }
     } finally {
       metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" -> 
id.toString).asJava)
diff --git a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala 
b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
index 8bd616079c6..1d1d72767db 100644
--- a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
+++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala
@@ -23,7 +23,6 @@ import java.nio.file.Paths
 import java.util.{OptionalInt, Collection => JCollection, Map => JMap}
 import java.util.concurrent.CompletableFuture
 import kafka.server.KafkaConfig
-import kafka.utils.CoreUtils
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
MetadataRecoveryStrategy, NetworkClient}
 import org.apache.kafka.common.KafkaException
@@ -143,13 +142,13 @@ class KafkaRaftManager[T](
   }
 
   def shutdown(): Unit = {
-    CoreUtils.swallow(expirationService.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => expirationService.shutdown())
     Utils.closeQuietly(expirationTimer, "expiration timer")
-    CoreUtils.swallow(clientDriver.shutdown(), this)
-    CoreUtils.swallow(scheduler.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => clientDriver.shutdown())
+    Utils.swallow(this.logger.underlying, () => scheduler.shutdown())
     Utils.closeQuietly(netChannel, "net channel")
     Utils.closeQuietly(raftLog, "raft log")
-    CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
+    Utils.swallow(this.logger.underlying, () => 
dataDirLock.foreach(_.destroy()))
   }
 
   override def handleRequest(
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 62bc9363872..115a505643d 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import com.yammer.metrics.core.Meter
-import kafka.utils.CoreUtils.inLock
 import kafka.utils.Logging
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
@@ -29,16 +28,12 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests._
-
 import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException, 
TopicPartition, Uuid}
 import org.apache.kafka.server.common.OffsetAndEpoch
-import org.apache.kafka.server.LeaderEndPoint
-import org.apache.kafka.server.ResultWithPartitions
-import org.apache.kafka.server.ReplicaState
-import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState, 
ReplicaState, ResultWithPartitions}
 import 
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.util.ShutdownableThread
+import org.apache.kafka.server.util.{LockUtils, ShutdownableThread}
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
@@ -105,9 +100,7 @@ abstract class AbstractFetcherThread(name: String,
 
   override def shutdown(): Unit = {
     initiateShutdown()
-    inLock(partitionMapLock) {
-      partitionMapCond.signalAll()
-    }
+    LockUtils.inLock[Exception](partitionMapLock, () => 
partitionMapCond.signalAll())
     awaitShutdown()
 
     // we don't need the lock since the thread has finished shutdown and 
metric removal is safe
@@ -121,7 +114,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   private def maybeFetch(): Unit = {
-    val fetchRequestOpt = inLock(partitionMapLock) {
+    val fetchRequestOpt = LockUtils.inLock(partitionMapLock, () => {
       val result = leader.buildFetch(partitionStates.partitionStateMap)
       val fetchRequestOpt = result.result
       val partitionsWithError = result.partitionsWithError
@@ -134,7 +127,7 @@ abstract class AbstractFetcherThread(name: String,
       }
 
       fetchRequestOpt
-    }
+    })
 
     fetchRequestOpt.ifPresent(replicaFetch =>
       processFetchRequest(replicaFetch.partitionData, 
replicaFetch.fetchRequest)
@@ -153,7 +146,8 @@ abstract class AbstractFetcherThread(name: String,
    * Builds offset for leader epoch requests for partitions that are in the 
truncating phase based
    * on latest epochs of the future replicas (the one that is fetching)
    */
-  private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], 
Set[TopicPartition]) = inLock(partitionMapLock) {
+  private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], 
Set[TopicPartition]) =
+    LockUtils.inLock(partitionMapLock, () => {
     val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
     val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
 
@@ -172,7 +166,7 @@ abstract class AbstractFetcherThread(name: String,
     }
 
     (partitionsWithEpochs, partitionsWithoutEpochs)
-  }
+  })
 
   private def maybeTruncate(): Unit = {
     val (partitionsWithEpochs, partitionsWithoutEpochs) = 
fetchTruncatingPartitions()
@@ -215,7 +209,7 @@ abstract class AbstractFetcherThread(name: String,
     val endOffsets = 
leader.fetchEpochEndOffsets(latestEpochsForPartitions.asJava)
     // Ensure we hold a lock during truncation
 
-    inLock(partitionMapLock) {
+    LockUtils.inLock[Exception](partitionMapLock, () => {
       //Check no leadership and no leader epoch changes happened whilst we 
were unlocked, fetching epochs
 
       val epochEndOffsets = endOffsets.asScala.filter { case (tp, _) =>
@@ -231,20 +225,21 @@ abstract class AbstractFetcherThread(name: String,
       val result = maybeTruncateToEpochEndOffsets(epochEndOffsets, 
latestEpochsForPartitions)
       handlePartitionsWithErrors(result.partitionsWithError.asScala, 
"truncateToEpochEndOffsets")
       updateFetchOffsetAndMaybeMarkTruncationComplete(result.result)
-    }
+    })
   }
 
   // Visibility for unit tests
   protected[server] def truncateOnFetchResponse(epochEndOffsets: 
Map[TopicPartition, EpochEndOffset]): Unit = {
-    inLock(partitionMapLock) {
+    LockUtils.inLock[Exception](partitionMapLock, () => {
       val result = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
       handlePartitionsWithErrors(result.partitionsWithError.asScala, 
"truncateOnFetchResponse")
       updateFetchOffsetAndMaybeMarkTruncationComplete(result.result)
-    }
+    })
   }
 
   // Visible for testing
-  private[server] def truncateToHighWatermark(partitions: 
Set[TopicPartition]): Unit = inLock(partitionMapLock) {
+  private[server] def truncateToHighWatermark(partitions: 
Set[TopicPartition]): Unit =
+    LockUtils.inLock[Exception](partitionMapLock, () => {
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
 
     for (tp <- partitions) {
@@ -260,7 +255,7 @@ abstract class AbstractFetcherThread(name: String,
     }
 
     updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
-  }
+  })
 
   private def maybeTruncateToEpochEndOffsets(fetchedEpochs: 
Map[TopicPartition, EpochEndOffset],
                                              latestEpochsForPartitions: 
Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]] = {
@@ -302,7 +297,8 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @return true if the epoch in this thread is updated. otherwise, false
    */
-  private def onPartitionFenced(tp: TopicPartition, requestEpoch: 
Optional[Integer]): Boolean = inLock(partitionMapLock) {
+  private def onPartitionFenced(tp: TopicPartition, requestEpoch: 
Optional[Integer]): Boolean =
+    LockUtils.inLock(partitionMapLock, () => {
     Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
       val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
       if (requestEpoch.isPresent && requestEpoch.get == currentLeaderEpoch) {
@@ -315,7 +311,7 @@ abstract class AbstractFetcherThread(name: String,
         true
       }
     }
-  }
+  })
 
   // visible for testing
   private[server] def processFetchRequest(sessionPartitions: 
util.Map[TopicPartition, FetchRequest.PartitionData],
@@ -331,16 +327,16 @@ abstract class AbstractFetcherThread(name: String,
       case t: Throwable =>
         if (isRunning) {
           warn(s"Error in response for fetch request $fetchRequest", t)
-          inLock(partitionMapLock) {
+          LockUtils.inLock(partitionMapLock, () => {
             partitionsWithError ++= partitionStates.partitionSet.asScala
-          }
+          })
         }
     }
     fetcherStats.requestRate.mark()
 
     if (responseData.nonEmpty) {
       // process fetched data
-      inLock(partitionMapLock) {
+      LockUtils.inLock[Exception](partitionMapLock, () => {
         responseData.foreachEntry { (topicPartition, partitionData) =>
           Option(partitionStates.stateValue(topicPartition)).foreach { 
currentFetchState =>
             // It's possible that a partition is removed and re-added or 
truncated when there is a pending fetch request.
@@ -467,7 +463,7 @@ abstract class AbstractFetcherThread(name: String,
             }
           }
         }
-      }
+      })
     }
 
     if (divergingEndOffsets.nonEmpty)
@@ -604,8 +600,8 @@ abstract class AbstractFetcherThread(name: String,
    * @param tp                Topic partition
    * @param leaderEpochOffset Epoch end offset received from the leader for 
this topic partition
    */
-  private def getOffsetTruncationState(tp: TopicPartition,
-                                       leaderEpochOffset: EpochEndOffset): 
OffsetTruncationState = inLock(partitionMapLock) {
+  private def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset: 
EpochEndOffset): OffsetTruncationState =
+    LockUtils.inLock(partitionMapLock , () => {
     if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
       // truncate to initial offset which is the high watermark for follower 
replica. For
       // future replica, it is either high watermark of the future replica or 
current
@@ -655,7 +651,7 @@ abstract class AbstractFetcherThread(name: String,
         OffsetTruncationState(min(leaderEpochOffset.endOffset, 
replicaEndOffset), truncationCompleted = true)
       }
     }
-  }
+  })
 
   /**
    * Handle a partition whose offset is out of range and return a new fetch 
offset.
@@ -868,9 +864,10 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   // Visible for testing
-  private[server] def fetchState(topicPartition: TopicPartition): 
Option[PartitionFetchState] = inLock(partitionMapLock) {
+  private[server] def fetchState(topicPartition: TopicPartition): 
Option[PartitionFetchState] =
+    LockUtils.inLock(partitionMapLock, () => {
     Option(partitionStates.stateValue(topicPartition))
-  }
+  })
 
   protected def toMemoryRecords(records: Records): MemoryRecords = {
     (records: @unchecked) match {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index cd061c2c8ab..d9cb3aca495 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -24,7 +24,6 @@ import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.metadata._
 import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, 
SharePartitionManager}
-import kafka.utils.CoreUtils
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -793,14 +792,14 @@ class BrokerServer(
       // Stop socket server to stop accepting any more connections and 
requests.
       // Socket server will be shutdown towards the end of the sequence.
       if (socketServer != null) {
-        CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+        Utils.swallow(this.logger.underlying, () => 
socketServer.stopProcessingRequests())
       }
       metadataPublishers.forEach(p => 
sharedServer.loader.removeAndClosePublisher(p).get())
       metadataPublishers.clear()
       if (dataPlaneRequestHandlerPool != null)
-        CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
dataPlaneRequestHandlerPool.shutdown())
       if (dataPlaneRequestProcessor != null)
-        CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+        Utils.swallow(this.logger.underlying, () => 
dataPlaneRequestProcessor.close())
       authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
 
       /**
@@ -814,44 +813,44 @@ class BrokerServer(
        * broker would have to take hours to recover the log during restart.
        */
       if (kafkaScheduler != null)
-        CoreUtils.swallow(kafkaScheduler.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => kafkaScheduler.shutdown())
 
       if (transactionCoordinator != null)
-        CoreUtils.swallow(transactionCoordinator.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
transactionCoordinator.shutdown())
 
       if (groupConfigManager != null)
-        CoreUtils.swallow(groupConfigManager.close(), this)
+        Utils.swallow(this.logger.underlying, () => groupConfigManager.close())
 
       if (groupCoordinator != null)
-        CoreUtils.swallow(groupCoordinator.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
groupCoordinator.shutdown())
 
       if (partitionMetadataClient != null)
-        CoreUtils.swallow(partitionMetadataClient.close(), this)
+        Utils.swallow(this.logger.underlying, () => 
partitionMetadataClient.close())
 
       if (shareCoordinator != null)
-        CoreUtils.swallow(shareCoordinator.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
shareCoordinator.shutdown())
 
       if (autoTopicCreationManager != null)
-        CoreUtils.swallow(autoTopicCreationManager.close(), this)
+        Utils.swallow(this.logger.underlying, () => 
autoTopicCreationManager.close())
 
       if (assignmentsManager != null)
-        CoreUtils.swallow(assignmentsManager.close(), this)
+        Utils.swallow(this.logger.underlying, () => assignmentsManager.close())
 
       if (replicaManager != null)
-        CoreUtils.swallow(replicaManager.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => replicaManager.shutdown())
 
       if (alterPartitionManager != null)
-        CoreUtils.swallow(alterPartitionManager.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
alterPartitionManager.shutdown())
 
       if (forwardingManager != null)
-        CoreUtils.swallow(forwardingManager.close(), this)
+        Utils.swallow(this.logger.underlying, () => forwardingManager.close())
 
       if (clientToControllerChannelManager != null)
-        CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
clientToControllerChannelManager.shutdown())
 
       if (logManager != null) {
         val brokerEpoch = if (lifecycleManager != null) 
lifecycleManager.brokerEpoch else -1
-        CoreUtils.swallow(logManager.shutdown(brokerEpoch), this)
+        Utils.swallow(this.logger.underlying, () => 
logManager.shutdown(brokerEpoch))
       }
 
       // Close remote log manager to give a chance to any of its underlying 
clients
@@ -859,21 +858,21 @@ class BrokerServer(
       remoteLogManagerOpt.foreach(Utils.closeQuietly(_, "remote log manager"))
 
       if (quotaManagers != null)
-        CoreUtils.swallow(quotaManagers.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => quotaManagers.shutdown())
 
       if (socketServer != null)
-        CoreUtils.swallow(socketServer.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => socketServer.shutdown())
 
       Utils.closeQuietly(brokerTopicStats, "broker topic stats")
       Utils.closeQuietly(sharePartitionManager, "share partition manager")
 
       if (persister != null)
-        CoreUtils.swallow(persister.stop(), this)
+        Utils.swallow(this.logger.underlying, () => persister.stop())
 
       if (lifecycleManager != null)
-        CoreUtils.swallow(lifecycleManager.close(), this)
+        Utils.swallow(this.logger.underlying, () => lifecycleManager.close())
 
-      CoreUtils.swallow(config.dynamicConfig.clear(), this)
+      Utils.swallow(this.logger.underlying, () => config.dynamicConfig.clear())
       Utils.closeQuietly(clientMetricsManager, "client metrics manager")
       sharedServer.stopForBroker()
       info("shut down completed")
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index b19cf062f56..7b9291ce165 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -23,7 +23,7 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicConfigPublisher, KRaftMetadataCachePublisher}
 
 import scala.collection.immutable
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
 import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -449,7 +449,7 @@ class ControllerServer(
       Utils.closeQuietly(registrationManager, "registration manager")
       registrationManager = null
       if (registrationChannelManager != null) {
-        CoreUtils.swallow(registrationChannelManager.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
registrationChannelManager.shutdown())
         registrationChannelManager = null
       }
       metadataPublishers.forEach(p => 
sharedServer.loader.removeAndClosePublisher(p).get())
@@ -464,24 +464,24 @@ class ControllerServer(
       Utils.closeQuietly(registrationsPublisher, "registrations publisher")
       registrationsPublisher = null
       if (socketServer != null)
-        CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+        Utils.swallow(this.logger.underlying, () => 
socketServer.stopProcessingRequests())
       if (controller != null)
         controller.beginShutdown()
       if (socketServer != null)
-        CoreUtils.swallow(socketServer.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => socketServer.shutdown())
       if (controllerApisHandlerPool != null)
-        CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
controllerApisHandlerPool.shutdown())
       if (controllerApis != null)
-        CoreUtils.swallow(controllerApis.close(), this)
+        Utils.swallow(this.logger.underlying, () => controllerApis.close())
       if (quotaManagers != null)
-        CoreUtils.swallow(quotaManagers.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => quotaManagers.shutdown())
       Utils.closeQuietly(controller, "controller")
       Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
       authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
       createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create 
topic policy"))
       alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter 
config policy"))
       socketServerFirstBoundPortFuture.completeExceptionally(new 
RuntimeException("shutting down"))
-      CoreUtils.swallow(config.dynamicConfig.clear(), this)
+      Utils.swallow(this.logger.underlying, () => config.dynamicConfig.clear())
       sharedServer.stopForController()
     } catch {
       case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f0b077b17f1..27156831b25 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,7 +25,7 @@ import kafka.log.LogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.server.DynamicBrokerConfig._
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -48,6 +48,7 @@ import 
org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, Replic
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
MetricConfigs}
 import org.apache.kafka.server.telemetry.{ClientTelemetry, 
ClientTelemetryExporterProvider}
+import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
 import org.apache.kafka.snapshot.RecordsSnapshotReader
 import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}
 
@@ -368,23 +369,23 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   }
 
   // Visibility for testing
-  private[server] def currentKafkaConfig: KafkaConfig = 
CoreUtils.inReadLock(lock) {
+  private[server] def currentKafkaConfig: KafkaConfig = inReadLock(lock, () => 
{
     currentConfig
-  }
+  })
 
-  private[server] def currentDynamicBrokerConfigs: Map[String, String] = 
CoreUtils.inReadLock(lock) {
+  private[server] def currentDynamicBrokerConfigs: Map[String, String] = 
inReadLock(lock, () => {
     dynamicBrokerConfigs.clone()
-  }
+  })
 
-  private[server] def currentDynamicDefaultConfigs: Map[String, String] = 
CoreUtils.inReadLock(lock) {
+  private[server] def currentDynamicDefaultConfigs: Map[String, String] = 
inReadLock(lock, () => {
     dynamicDefaultConfigs.clone()
-  }
+  })
 
-  private[server] def clientTelemetryExporterPlugin: 
Option[ClientTelemetryExporterPlugin] = CoreUtils.inReadLock(lock) {
+  private[server] def clientTelemetryExporterPlugin: 
Option[ClientTelemetryExporterPlugin] = inReadLock(lock, () => {
     telemetryExporterPluginOpt
-  }
+  })
 
-  private[server] def updateBrokerConfig(brokerId: Int, persistentProps: 
Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
+  private[server] def updateBrokerConfig(brokerId: Int, persistentProps: 
Properties, doLog: Boolean = true): Unit = inWriteLock[Exception](lock, () => {
     try {
       val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
       dynamicBrokerConfigs.clear()
@@ -393,9 +394,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     } catch {
       case e: Exception => error(s"Per-broker configs of $brokerId could not 
be applied: ${persistentProps.keySet()}", e)
     }
-  }
+  })
 
-  private[server] def updateDefaultConfig(persistentProps: Properties, doLog: 
Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
+  private[server] def updateDefaultConfig(persistentProps: Properties, doLog: 
Boolean = true): Unit = inWriteLock[Exception](lock, () => {
     try {
       val props = fromPersistentProps(persistentProps, perBrokerConfig = false)
       dynamicDefaultConfigs.clear()
@@ -404,7 +405,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     } catch {
       case e: Exception => error(s"Cluster default configs could not be 
applied: ${persistentProps.keySet()}", e)
     }
-  }
+  })
 
   /**
    * Config updates are triggered through actual changes in stored values.
@@ -414,7 +415,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
    * the SSL configs have changed, then the update will be handled when 
configuration changes are processed.
    * At the moment, only listener configs are considered for reloading.
    */
-  private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: 
Properties): Unit = CoreUtils.inWriteLock(lock) {
+  private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: 
Properties): Unit = inWriteLock[Exception](lock, () => {
     reconfigurables.forEach(r => {
       if (ReloadableFileConfigs.exists(r.reconfigurableConfigs.contains)) {
         r match {
@@ -427,7 +428,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
         }
       }
     })
-  }
+  })
 
   private[server] def fromPersistentProps(persistentProps: Properties,
                                           perBrokerConfig: Boolean): 
Properties = {
@@ -471,10 +472,10 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     newProps
   }
 
-  private[server] def validate(props: Properties, perBrokerConfig: Boolean): 
Unit = CoreUtils.inReadLock(lock) {
+  private[server] def validate(props: Properties, perBrokerConfig: Boolean): 
Unit = inReadLock(lock, () => {
     val newProps = validatedKafkaProps(props, perBrokerConfig)
     processReconfiguration(newProps, validateOnly = true)
-  }
+  })
 
   private def removeInvalidConfigs(props: Properties, perBrokerConfig: 
Boolean): Unit = {
     try {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2092d36d7f9..65097f53a5c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,8 +20,9 @@ package kafka.server
 import java.util
 import java.util.concurrent.TimeUnit
 import java.util.Properties
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
 import kafka.utils.Implicits._
+import org.apache.commons.validator.routines.InetAddressValidator
 import org.apache.kafka.common.{Endpoint, Reconfigurable}
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, TopicConfig}
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
@@ -143,6 +144,81 @@ object KafkaConfig {
     }
     output
   }
+
+  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): 
Seq[Endpoint] = {
+    listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
+  }
+
+  private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: 
java.util.List[String]): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
+  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol], 
requireDistinctPorts: Boolean): Seq[Endpoint] = {
+    def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean 
= {
+      val inetAddressValidator = InetAddressValidator.getInstance()
+      (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+        (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+    }
+
+    def validate(endPoints: Seq[Endpoint]): Unit = {
+      val distinctListenerNames = endPoints.map(_.listener).distinct
+      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
+
+      val (duplicatePorts, _) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && 
InetAddressValidator.getInstance().isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they 
are valid
+      duplicatePortsPartitionedByValidIps.foreach {
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+          duplicatesWithIpHosts match {
+            case eps if eps.isEmpty =>
+            case Seq(ep1, ep2) =>
+              if (requireDistinctPorts) {
+                val errorMessage = "If you have two listeners on " +
+                  s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port"
+                require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
+
+                // If we reach this point it means that even though 
duplicatesWithIpHosts in isolation can be valid, if
+                // there happens to be ANOTHER listener on this port without 
an IP host (such as a null host) then its
+                // not valid.
+                if (duplicatesWithoutIpHosts.nonEmpty)
+                  throw new IllegalArgumentException(errorMessage)
+              }
+            case _ =>
+              // Having more than 2 duplicate endpoints doesn't make sense 
since we only have 2 IP stacks (one is IPv4
+              // and the other is IPv6)
+              if (requireDistinctPorts)
+                throw new IllegalArgumentException("Each listener must have a 
different port unless exactly one listener has " +
+                  s"an IPv4 address and the other IPv6 address, listeners: 
$listeners, port: $port")
+          }
+      }
+    }
+
+    val endPoints = try {
+      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap).asScala
+    } catch {
+      case e: Exception =>
+        throw new IllegalArgumentException(s"Error creating broker listeners 
from '$listeners': ${e.getMessage}", e)
+    }
+    validate(endPoints)
+    endPoints
+  }
 }
 
 /**
@@ -444,7 +520,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   }
 
   def listeners: Seq[Endpoint] =
-    
CoreUtils.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap)
+    
KafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap)
 
   def controllerListeners: Seq[Endpoint] =
     listeners.filter(l => controllerListenerNames.contains(l.listener))
@@ -461,7 +537,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
     val advertisedListenersProp = 
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
     val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
-      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+      KafkaConfig.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
         .filter(l => controllerListenerNames.contains(l.listener))
     } else {
       Seq.empty
@@ -491,7 +567,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     // Use advertised listeners if defined, fallback to listeners otherwise
     val advertisedListenersProp = 
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
     val advertisedListeners = if (advertisedListenersProp != null) {
-      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+      KafkaConfig.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
     } else {
       listeners
     }
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index e3497a6ff88..270b1f30a6e 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -18,10 +18,10 @@ package kafka.server
 
 import java.io.File
 import java.util.concurrent.CompletableFuture
-import kafka.utils.{CoreUtils, Logging, Mx4jLoader}
+import kafka.utils.{Logging, Mx4jLoader}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.utils.{AppInfoParser, Time}
+import org.apache.kafka.common.utils.{AppInfoParser, Time, Utils}
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, 
BootstrapMetadata}
@@ -103,7 +103,7 @@ class KafkaRaftServer(
     // stops the raft client early on, which would disrupt broker shutdown.
     broker.foreach(_.shutdown())
     controller.foreach(_.shutdown())
-    CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, 
config.brokerId.toString, metrics), this)
+    Utils.swallow(this.logger.underlying, () => 
AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, 
metrics))
   }
 
   override def awaitShutdown(): Unit = {
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index 3acfc9bf0b9..a20745f35d8 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.metrics.KafkaMetricsReporter
 import kafka.raft.KafkaRaftManager
 import kafka.server.Server.MetricsPrefix
-import kafka.utils.{CoreUtils, Logging, VerifiableProperties}
+import kafka.utils.{Logging, VerifiableProperties}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
@@ -365,7 +365,7 @@ class SharedServer(
     // Ideally, this would just resign our leadership, if we had it. But we 
don't have an API in
     // RaftManager for that yet, so shut down the RaftManager.
     Option(raftManager).foreach(_raftManager => {
-      CoreUtils.swallow(_raftManager.shutdown(), this)
+      Utils.swallow(this.logger.underlying, () => _raftManager.shutdown())
       raftManager = null
     })
   }
@@ -376,10 +376,10 @@ class SharedServer(
     } else {
       info("Stopping SharedServer")
       if (loader != null) {
-        CoreUtils.swallow(loader.beginShutdown(), this)
+        Utils.swallow(this.logger.underlying, () => loader.beginShutdown())
       }
       if (snapshotGenerator != null) {
-        CoreUtils.swallow(snapshotGenerator.beginShutdown(), this)
+        Utils.swallow(this.logger.underlying, () => 
snapshotGenerator.beginShutdown())
       }
       Utils.closeQuietly(loader, "loader")
       loader = null
@@ -388,7 +388,7 @@ class SharedServer(
       Utils.closeQuietly(snapshotGenerator, "snapshot generator")
       snapshotGenerator = null
       if (raftManager != null) {
-        CoreUtils.swallow(raftManager.shutdown(), this)
+        Utils.swallow(this.logger.underlying, () => raftManager.shutdown())
         raftManager = null
       }
       Utils.closeQuietly(controllerServerMetrics, "controller server metrics")
@@ -399,7 +399,7 @@ class SharedServer(
       nodeMetrics = null
       Utils.closeQuietly(metrics, "metrics")
       metrics = null
-      CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, 
sharedServerConfig.nodeId.toString, metrics), this)
+      Utils.swallow(this.logger.underlying, () => 
AppInfoParser.unregisterAppInfo(MetricsPrefix, 
sharedServerConfig.nodeId.toString, metrics))
       started = false
     }
   }
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 3f12d5c009e..8c3f48ef27b 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -24,7 +24,7 @@ import joptsimple.{OptionException, OptionSpec}
 import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, 
KafkaRequestHandlerPoolFactory}
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
@@ -143,13 +143,13 @@ class TestRaftServer(
 
   def shutdown(): Unit = {
     if (raftManager != null)
-      CoreUtils.swallow(raftManager.shutdown(), this)
+      Utils.swallow(this.logger.underlying, () => raftManager.shutdown())
     if (workloadGenerator != null)
-      CoreUtils.swallow(workloadGenerator.shutdown(), this)
+      Utils.swallow(this.logger.underlying, () => workloadGenerator.shutdown())
     if (dataPlaneRequestHandlerPool != null)
-      CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+      Utils.swallow(this.logger.underlying, () => 
dataPlaneRequestHandlerPool.shutdown())
     if (socketServer != null)
-      CoreUtils.swallow(socketServer.shutdown(), this)
+      Utils.swallow(this.logger.underlying, () => socketServer.shutdown())
     Utils.closeQuietly(metrics, "metrics")
     shutdownLatch.countDown()
   }
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
deleted file mode 100755
index 66f9bd48657..00000000000
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.io.File
-import java.util.concurrent.locks.{Lock, ReadWriteLock}
-import java.lang.management.ManagementFactory
-import com.typesafe.scalalogging.Logger
-
-import javax.management.ObjectName
-import scala.collection.Seq
-import org.apache.commons.validator.routines.InetAddressValidator
-import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.network.SocketServerConfigs
-import org.slf4j.event.Level
-
-import scala.jdk.CollectionConverters._
-
-/**
- * General helper functions!
- *
- * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
- * the standard library etc.
- *
- * If you are making a new helper function and want to add it to this class 
please ensure the following:
- * 1. It has documentation
- * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
- * 3. You have tests for it if it is nontrivial in any way
- */
-object CoreUtils {
-  private val logger = Logger(getClass)
-
-  private val inetAddressValidator = InetAddressValidator.getInstance()
-
-  /**
-    * Do the given action and log any exceptions thrown without rethrowing 
them.
-    *
-    * @param action The action to execute.
-    * @param logging The logging instance to use for logging the thrown 
exception.
-    * @param logLevel The log level to use for logging.
-    */
-  @noinline // inlining this method is not typically useful and it triggers 
spurious spotbugs warnings
-  def swallow(action: => Unit, logging: Logging, logLevel: Level = 
Level.WARN): Unit = {
-    try {
-      action
-    } catch {
-      case e: Throwable => logLevel match {
-        case Level.ERROR => logging.error(e.getMessage, e)
-        case Level.WARN => logging.warn(e.getMessage, e)
-        case Level.INFO => logging.info(e.getMessage, e)
-        case Level.DEBUG => logging.debug(e.getMessage, e)
-        case Level.TRACE => logging.trace(e.getMessage, e)
-      }
-    }
-  }
-
-  /**
-   * Recursively delete the list of files/directories and any subfiles (if any 
exist)
-   * @param files list of files to be deleted
-   */
-  def delete(files: java.util.List[String]): Unit = files.forEach(f => 
Utils.delete(new File(f)))
-
-  /**
-   * Register the given mbean with the platform mbean server,
-   * unregistering any mbean that was there before. Note,
-   * this method will not throw an exception if the registration
-   * fails (since there is nothing you can do and it isn't fatal),
-   * instead it just returns false indicating the registration failed.
-   * @param mbean The object to register as an mbean
-   * @param name The name to register this mbean with
-   * @return true if the registration succeeded
-   */
-  def registerMBean(mbean: Object, name: String): Boolean = {
-    try {
-      val mbs = ManagementFactory.getPlatformMBeanServer
-      mbs synchronized {
-        val objName = new ObjectName(name)
-        if (mbs.isRegistered(objName))
-          mbs.unregisterMBean(objName)
-        mbs.registerMBean(mbean, objName)
-        true
-      }
-    } catch {
-      case e: Exception =>
-        logger.error(s"Failed to register Mbean $name", e)
-        false
-    }
-  }
-
-  /**
-   * Execute the given function inside the lock
-   */
-  def inLock[T](lock: Lock)(fun: => T): T = {
-    lock.lock()
-    try {
-      fun
-    } finally {
-      lock.unlock()
-    }
-  }
-
-  def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = 
inLock[T](lock.readLock)(fun)
-
-  def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = 
inLock[T](lock.writeLock)(fun)
-
-  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): 
Seq[Endpoint] = {
-    listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
-  }
-
-  private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: 
java.util.List[String]): Unit = {
-    val distinctPorts = endpoints.map(_.port).distinct
-    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
-  }
-
-  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol], 
requireDistinctPorts: Boolean): Seq[Endpoint] = {
-    def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
-      (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
-        (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
-
-    def validate(endPoints: Seq[Endpoint]): Unit = {
-      val distinctListenerNames = endPoints.map(_.listener).distinct
-      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-
-      val (duplicatePorts, _) = endPoints.filter {
-        // filter port 0 for unit tests
-        ep => ep.port != 0
-      }.groupBy(_.port).partition {
-        case (_, endpoints) => endpoints.size > 1
-      }
-
-      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
-      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
-        case (port, eps) =>
-          (port, eps.partition(ep =>
-            ep.host != null && inetAddressValidator.isValid(ep.host)
-          ))
-      }
-
-      // Iterate through every grouping of duplicates by port to see if they 
are valid
-      duplicatePortsPartitionedByValidIps.foreach {
-        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
-          if (requireDistinctPorts)
-            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
-
-          duplicatesWithIpHosts match {
-            case eps if eps.isEmpty =>
-            case Seq(ep1, ep2) =>
-              if (requireDistinctPorts) {
-                val errorMessage = "If you have two listeners on " +
-                  s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port"
-                require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
-
-                // If we reach this point it means that even though 
duplicatesWithIpHosts in isolation can be valid, if
-                // there happens to be ANOTHER listener on this port without 
an IP host (such as a null host) then its
-                // not valid.
-                if (duplicatesWithoutIpHosts.nonEmpty)
-                  throw new IllegalArgumentException(errorMessage)
-              }
-            case _ =>
-              // Having more than 2 duplicate endpoints doesn't make sense 
since we only have 2 IP stacks (one is IPv4
-              // and the other is IPv6)
-              if (requireDistinctPorts)
-                throw new IllegalArgumentException("Each listener must have a 
different port unless exactly one listener has " +
-                  s"an IPv4 address and the other IPv6 address, listeners: 
$listeners, port: $port")
-          }
-      }
-    }
-
-    val endPoints = try {
-      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap).asScala
-    } catch {
-      case e: Exception =>
-        throw new IllegalArgumentException(s"Error creating broker listeners 
from '$listeners': ${e.getMessage}", e)
-    }
-    validate(endPoints)
-    endPoints
-  }
-}
diff --git a/core/src/main/scala/kafka/utils/Logging.scala 
b/core/src/main/scala/kafka/utils/Logging.scala
index e08a6873fc1..cc51cdcc422 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -18,6 +18,7 @@
 package kafka.utils
 
 import com.typesafe.scalalogging.Logger
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.logger.LoggingController
 import org.slf4j.{LoggerFactory, Marker, MarkerFactory}
 
@@ -29,7 +30,7 @@ object Log4jControllerRegistration {
 
   private def registerMBean(mbean: LoggingController, typeAttr: String): Unit 
= {
     try {
-      CoreUtils.registerMBean(mbean, s"kafka:type=$typeAttr")
+      Utils.registerMBean(mbean, s"kafka:type=$typeAttr")
       logger.info("Registered `kafka:type={}` MBean", typeAttr)
     } catch {
       case e: Exception => logger.warn("Couldn't register `kafka:type={}` 
MBean", typeAttr, e)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 53bda6a67c9..d3821ee0eee 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -23,7 +23,7 @@ import java.util
 import java.util.{Locale, Optional, OptionalInt, Properties, stream}
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
 import org.apache.kafka.clients.consumer.GroupProtocol
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{Exit, Time}
+import org.apache.kafka.common.utils.{Exit, Time, Utils}
 import org.apache.kafka.common.{DirectoryId, Uuid}
 import 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
 REQUIRE_METADATA_LOG_DIR}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion}
@@ -119,15 +119,15 @@ class KRaftQuorumImplementation(
       broker
     } catch {
       case e: Throwable => {
-        if (broker != null) CoreUtils.swallow(broker.shutdown(), log)
-        CoreUtils.swallow(sharedServer.stopForBroker(), log)
+        if (broker != null) Utils.swallow(() => broker.shutdown())
+        Utils.swallow(() => sharedServer.stopForBroker())
         throw e
       }
     }
   }
 
   override def shutdown(): Unit = {
-    CoreUtils.swallow(controllerServer.shutdown(), log)
+    Utils.swallow(() => controllerServer.shutdown())
   }
 }
 
@@ -229,7 +229,7 @@ abstract class QuorumTestHarness extends Logging {
   def shutdownKRaftController(): Unit = {
     // Note that the RaftManager instance is left running; it will be shut 
down in tearDown()
     val kRaftQuorumImplementation = asKRaft()
-    CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(), 
kRaftQuorumImplementation.log)
+    Utils.swallow(this.logger.underlying, () => 
kRaftQuorumImplementation.controllerServer.shutdown())
   }
 
   def addFormatterSettings(formatter: Formatter): Unit = {}
@@ -327,8 +327,8 @@ abstract class QuorumTestHarness extends Logging {
       controllerServer.startup()
     } catch {
       case e: Throwable =>
-        if (controllerServer != null) 
CoreUtils.swallow(controllerServer.shutdown(), this)
-        CoreUtils.swallow(sharedServer.stopForController(), this)
+        if (controllerServer != null) Utils.swallow(this.logger.underlying, () 
=> controllerServer.shutdown())
+        Utils.swallow(this.logger.underlying, () => 
sharedServer.stopForController())
         throw e
     }
     new KRaftQuorumImplementation(
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 9eeed9b86f5..5026e7364fb 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
OffsetAndEpoch}
@@ -122,8 +123,8 @@ class LocalLeaderEndPointTest extends Logging {
 
   @AfterEach
   def tearDown(): Unit = {
-    CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this)
-    CoreUtils.swallow(quotaManager.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => 
replicaManager.shutdown(checkpointHW = false))
+    Utils.swallow(this.logger.underlying, () => quotaManager.shutdown())
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 42cebdcb300..61c8615d33d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -69,11 +69,11 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: 
CoordinatorMember] extend
 
   @AfterEach
   def tearDown(): Unit = {
-    CoreUtils.swallow(replicaManager.shutdown(false), this)
-    CoreUtils.swallow(executor.shutdownNow(), this)
+    Utils.swallow(this.logger.underlying, () => replicaManager.shutdown(false))
+    Utils.swallow(this.logger.underlying, () => executor.shutdownNow())
     Utils.closeQuietly(timer, "mock timer")
-    CoreUtils.swallow(scheduler.shutdown(), this)
-    CoreUtils.swallow(time.scheduler.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => scheduler.shutdown())
+    Utils.swallow(this.logger.underlying, () => time.scheduler.shutdown())
   }
 
   /**
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 33e85e30a7a..11271d17c93 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -24,7 +24,7 @@ import scala.util.Random
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq}
 import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -42,10 +42,13 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 import com.yammer.metrics.core.Meter
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.LeaderConstants
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.logging.log4j.core.config.Configurator
 
+import java.io.File
+
 class UncleanLeaderElectionTest extends QuorumTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
@@ -89,7 +92,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   @AfterEach
   override def tearDown(): Unit = {
     brokers.foreach(broker => shutdownBroker(broker))
-    brokers.foreach(broker => CoreUtils.delete(broker.config.logDirs))
+    brokers.foreach(broker => broker.config.logDirs.forEach(f => 
Utils.delete(new File(f))))
 
     // restore log levels
     Configurator.setLevel(kafkaApisLogger.getName, Level.ERROR)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 318a0c66140..d142a4e64de 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
@@ -69,7 +69,7 @@ class LogCleanerTest extends Logging {
 
   @AfterEach
   def teardown(): Unit = {
-    CoreUtils.swallow(time.scheduler.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => time.scheduler.shutdown())
     Utils.delete(tmpdir)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 34472d4ca68..1a58f52ea8d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -22,7 +22,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinat
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.share.SharePartitionManager
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.clients.consumer.AcknowledgeType
@@ -157,9 +157,9 @@ class KafkaApisTest extends Logging {
 
   @AfterEach
   def tearDown(): Unit = {
-    CoreUtils.swallow(quotas.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => quotas.shutdown())
     if (kafkaApis != null)
-      CoreUtils.swallow(kafkaApis.close(), this)
+      Utils.swallow(this.logger.underlying, () => kafkaApis.close())
     TestUtils.clearYammerMetrics()
     metrics.close()
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2e366a97117..2b1830e136d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
 import java.util
 import java.util.{Arrays, Collections, Properties}
 import kafka.utils.TestUtils.assertBadConfigContainingMessage
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.common.{Endpoint, Node}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigException, 
SaslConfigs, SecurityConfig, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
@@ -604,7 +604,7 @@ class KafkaConfigTest {
 
   private def listenerListToEndPoints(listenerList: java.util.List[String],
                               securityProtocolMap: util.Map[ListenerName, 
SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
-    CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
+    KafkaConfig.listenerListToEndPoints(listenerList, securityProtocolMap)
 
   @Test
   def testListenerDefaults(): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index c8692661134..e4bcb3d49a5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -18,14 +18,17 @@ package kafka.server
 
 import java.util
 import java.util.concurrent.atomic.AtomicReference
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, 
MetricsReporter}
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.server.metrics.MetricConfigs
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.api.Assertions._
 
+import java.io.File
+
 
 object KafkaMetricsReporterTest {
   val setupError = new AtomicReference[String]("")
@@ -89,7 +92,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
   @AfterEach
   override def tearDown(): Unit = {
     broker.shutdown()
-    CoreUtils.delete(config.logDirs)
+    config.logDirs().forEach(f => Utils.delete(new File(f)))
     super.tearDown()
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala 
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index cbc6df7180e..45e7cc2e18d 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,7 +22,7 @@ import java.util.Collections
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import kafka.api.IntegrationTestHarness
 import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, 
waitUntilTrue}
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
@@ -220,7 +220,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // Make log directory of the partition on the leader broker inaccessible 
by replacing it with a file
     val localLog = leaderBroker.replicaManager.localLogOrException(partition)
     val logDir = localLog.dir.getParentFile
-    CoreUtils.swallow(Utils.delete(logDir), this)
+    Utils.swallow(this.logger.underlying, () => Utils.delete(logDir))
     Files.createFile(logDir.toPath)
     assertTrue(logDir.isFile)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0c5010d7de9..08d2eb4299d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{CompletableFuture, Executors, 
LinkedBlockingQueue,
 import java.util.{Optional, Properties}
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.TestUtils.waitUntilTrue
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
@@ -68,14 +68,14 @@ class ReplicaManagerConcurrencyTest extends Logging {
 
   @AfterEach
   def cleanup(): Unit = {
-    CoreUtils.swallow(tasks.foreach(_.shutdown()), this)
-    CoreUtils.swallow(executor.shutdownNow(), this)
-    CoreUtils.swallow(executor.awaitTermination(5, TimeUnit.SECONDS), this)
-    CoreUtils.swallow(channel.shutdown(), this)
-    CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this)
-    CoreUtils.swallow(quotaManagers.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => tasks.foreach(_.shutdown()))
+    Utils.swallow(this.logger.underlying, () => executor.shutdownNow())
+    Utils.swallow(this.logger.underlying, () => executor.awaitTermination(5, 
TimeUnit.SECONDS))
+    Utils.swallow(this.logger.underlying, () => channel.shutdown())
+    Utils.swallow(this.logger.underlying, () => 
replicaManager.shutdown(checkpointHW = false))
+    Utils.swallow(this.logger.underlying, () => quotaManagers.shutdown())
     Utils.closeQuietly(metrics, "metrics")
-    CoreUtils.swallow(time.scheduler.shutdown(), this)
+    Utils.swallow(this.logger.underlying, () => time.scheduler.shutdown())
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index c8a137a4db7..0f3125685ad 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
 
 import java.io.File
 import java.util.concurrent.CancellationException
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, 
IntegerSerializer, StringDeserializer, StringSerializer}
-import org.apache.kafka.common.utils.Exit
+import org.apache.kafka.common.utils.{Exit, Utils}
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.raft.KRaftConfigs
 import org.apache.kafka.server.config.ServerLogConfigs
@@ -177,7 +177,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   def testShutdownWithKRaftControllerUnavailable(): Unit = {
     shutdownKRaftController()
     killBroker(0, Duration.ofSeconds(1))
-    CoreUtils.delete(broker.config.logDirs)
+    broker.config.logDirs.forEach(f => Utils.delete(new File(f)))
     verifyNonDaemonThreadsStatus()
   }
 
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
deleted file mode 100755
index 73a2403870f..00000000000
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.locks.ReentrantLock
-import java.util.regex.Pattern
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import kafka.utils.CoreUtils.inLock
-import org.apache.kafka.common.KafkaException
-import org.slf4j.event.Level
-
-
-class CoreUtilsTest extends Logging {
-
-  val clusterIdPattern: Pattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
-
-  @Test
-  def testSwallow(): Unit = {
-    var loggedMessage: Option[String] = None
-    val testLogging: Logging = new Logging {
-      override def info(msg: => String, e: => Throwable): Unit = {
-        loggedMessage = Some(msg+Level.INFO)
-      }
-      override def debug(msg: => String, e: => Throwable): Unit = {
-        loggedMessage = Some(msg+Level.DEBUG)
-      }
-      override def warn(msg: => String, e: => Throwable): Unit = {
-        loggedMessage = Some(msg+Level.WARN)
-      }
-      override def error(msg: => String, e: => Throwable): Unit = {
-        loggedMessage = Some(msg+Level.ERROR)
-      }
-      override def trace(msg: => String, e: => Throwable): Unit = {
-        loggedMessage = Some(msg+Level.TRACE)
-      }
-    }
-
-    CoreUtils.swallow(throw new KafkaException("test"), testLogging, 
Level.TRACE)
-    assertEquals(Some("test"+Level.TRACE), loggedMessage)
-    CoreUtils.swallow(throw new KafkaException("test"), testLogging, 
Level.DEBUG)
-    assertEquals(Some("test"+Level.DEBUG), loggedMessage)
-    CoreUtils.swallow(throw new KafkaException("test"), testLogging, 
Level.INFO)
-    assertEquals(Some("test"+Level.INFO), loggedMessage)
-    CoreUtils.swallow(throw new KafkaException("test"), testLogging, 
Level.WARN)
-    assertEquals(Some("test"+Level.WARN),loggedMessage)
-    CoreUtils.swallow(throw new KafkaException("test"), testLogging, 
Level.ERROR)
-    assertEquals(Some("test"+Level.ERROR),loggedMessage)
-  }
-
-  @Test
-  def testInLock(): Unit = {
-    val lock = new ReentrantLock()
-    val result = inLock(lock) {
-      assertTrue(lock.isHeldByCurrentThread, "Should be in lock")
-      1 + 1
-    }
-    assertEquals(2, result)
-    assertFalse(lock.isLocked, "Should be unlocked")
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e53e10d80d1..e08bbe0be71 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.utils.Utils.formatAddress
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -195,7 +196,7 @@ object TestUtils extends Logging {
     val future = Future.traverse(brokers) { s =>
       Future {
         s.shutdown()
-        if (deleteLogDirs) CoreUtils.delete(s.config.logDirs)
+        if (deleteLogDirs) s.config.logDirs.forEach(f => Utils.delete(new 
File(f)))
       }
     }
     Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java 
b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
index 86338726d5e..16bd77d944a 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.util;
 
 import java.util.Objects;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 
 /**
  * A utility class providing helper methods for working with {@link Lock} 
objects.
@@ -59,6 +60,22 @@ public class LockUtils {
         }
     }
 
+    /**
+     * Executes the given {@link ThrowingSupplier} within the context of the 
read lock of the specified {@link ReadWriteLock}.
+     * see {@link LockUtils#inLock(Lock, ThrowingSupplier)}
+     */
+    public static <T, E extends Exception> T inReadLock(ReadWriteLock lock, 
ThrowingSupplier<T, E> supplier) throws E {
+        return inLock(lock.readLock(), supplier);
+    }
+
+    /**
+     * Executes the given {@link ThrowingSupplier} within the context of the 
write lock of the specified {@link ReadWriteLock}.
+     * see {@link LockUtils#inLock(Lock, ThrowingSupplier)}
+     */
+    public static <T, E extends Exception> T inWriteLock(ReadWriteLock lock, 
ThrowingSupplier<T, E> supplier) throws E {
+        return inLock(lock.writeLock(), supplier);
+    }
+
     /**
      * Executes the given {@link ThrowingRunnable} within the context of the 
specified {@link Lock}.
      * The lock is acquired before executing the runnable and released after 
the execution,
@@ -81,4 +98,20 @@ public class LockUtils {
             lock.unlock();
         }
     }
+
+    /**
+     * Executes the given {@link ThrowingRunnable} within the context of the 
read lock of the specified {@link ReadWriteLock}.
+     * see {@link LockUtils#inLock(Lock, ThrowingRunnable)}
+     */
+    public static <E extends Exception> void inReadLock(ReadWriteLock lock, 
ThrowingRunnable<E> runnable) throws E {
+        inLock(lock.readLock(), runnable);
+    }
+
+    /**
+     * Executes the given {@link ThrowingRunnable} within the context of the 
write lock of the specified {@link ReadWriteLock}.
+     * see {@link LockUtils#inLock(Lock, ThrowingRunnable)}
+     */
+    public static <E extends Exception> void inWriteLock(ReadWriteLock lock, 
ThrowingRunnable<E> runnable) throws E {
+        inLock(lock.writeLock(), runnable);
+    }
 }

Reply via email to