This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e4eebe9c00 KAFKA-12895 Drop support for Scala 2.12 in Kafka 4.0 
(#17313)
0e4eebe9c00 is described below

commit 0e4eebe9c000e9eefad6461fb502f59615d7318b
Author: TengYao Chi <[email protected]>
AuthorDate: Mon Oct 7 01:34:38 2024 +0800

    KAFKA-12895 Drop support for Scala 2.12 in Kafka 4.0 (#17313)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 LICENSE-binary                                     |  7 ---
 README.md                                          | 51 +-------------------
 build.gradle                                       | 31 ++----------
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  2 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala     |  3 +-
 .../controller/ControllerChannelManager.scala      | 13 +++--
 .../scala/kafka/controller/ControllerContext.scala |  3 +-
 .../scala/kafka/controller/KafkaController.scala   | 17 ++++---
 .../kafka/controller/PartitionStateMachine.scala   |  3 +-
 .../kafka/controller/ReplicaStateMachine.scala     |  5 +-
 .../group/GroupCoordinatorAdapter.scala            | 11 ++---
 .../kafka/coordinator/group/GroupMetadata.scala    |  9 ++--
 .../coordinator/group/GroupMetadataManager.scala   | 13 +++--
 .../TransactionMarkerChannelManager.scala          |  3 +-
 .../transaction/TransactionStateManager.scala      |  7 ++-
 core/src/main/scala/kafka/log/LogManager.scala     |  3 +-
 .../main/scala/kafka/network/RequestChannel.scala  |  3 +-
 .../kafka/security/authorizer/AclAuthorizer.scala  | 13 +++--
 .../kafka/server/AbstractFetcherManager.scala      |  5 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |  7 ++-
 .../kafka/server/AddPartitionsToTxnManager.scala   |  3 +-
 core/src/main/scala/kafka/server/AuthHelper.scala  |  3 +-
 .../main/scala/kafka/server/ConfigHandler.scala    |  3 +-
 .../scala/kafka/server/DelayedDeleteRecords.scala  |  7 ++-
 .../main/scala/kafka/server/DelayedProduce.scala   |  7 ++-
 .../kafka/server/DelayedRemoteListOffsets.scala    |  7 ++-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 13 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 15 +++---
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  |  5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 27 +++++------
 .../main/scala/kafka/server/ZkAdminManager.scala   |  5 +-
 .../kafka/server/metadata/ZkMetadataCache.scala    |  5 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  5 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    |  8 ----
 core/src/main/scala/kafka/utils/Implicits.scala    | 19 --------
 .../main/scala/kafka/utils/json/DecodeJson.scala   |  4 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  3 +-
 .../test/java/kafka/admin/ConfigCommandTest.java   |  8 ++--
 .../integration/kafka/api/BaseConsumerTest.scala   | 42 ++++++++--------
 .../kafka/api/PlaintextConsumerAssignTest.scala    |  8 ++--
 .../kafka/server/IntegrationTestUtils.scala        |  3 +-
 .../unit/kafka/network/ConnectionQuotasTest.scala  |  9 ++--
 .../kafka/server/AbstractFetcherManagerTest.scala  |  3 +-
 .../server/AddPartitionsToTxnManagerTest.scala     |  3 +-
 .../unit/kafka/server/MockLeaderEndPoint.scala     |  3 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  1 -
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 docs/upgrade.html                                  |  4 ++
 gradle/dependencies.gradle                         | 21 ++------
 gradle/spotbugs-exclude.xml                        | 56 ----------------------
 gradlewAll                                         |  6 ++-
 .../kafka/jmh/util/ConcurrentMapBenchmark.java     | 19 ++++++++
 release/release.py                                 |  4 +-
 .../kafka/common/test/KafkaClusterTestKit.java     |  1 -
 55 files changed, 175 insertions(+), 369 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 611909cba67..d067f3ffb52 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -226,7 +226,6 @@ jackson-jaxrs-json-provider-2.16.2
 jackson-module-afterburner-2.16.2
 jackson-module-jaxb-annotations-2.16.2
 jackson-module-scala_2.13-2.16.2
-jackson-module-scala_2.12-2.16.2
 jakarta.validation-api-2.0.2
 javassist-3.29.2-GA
 jetty-client-9.4.54.v20240208
@@ -257,15 +256,9 @@ opentelemetry-proto-1.0.0-alpha
 plexus-utils-3.5.1
 reload4j-1.2.25
 rocksdbjni-7.9.2
-scala-collection-compat_2.12-2.10.0
-scala-collection-compat_2.13-2.10.0
-scala-library-2.12.19
 scala-library-2.13.15
-scala-logging_2.12-3.9.5
 scala-logging_2.13-3.9.5
-scala-reflect-2.12.19
 scala-reflect-2.13.15
-scala-java8-compat_2.12-1.0.2
 scala-java8-compat_2.13-1.0.2
 snappy-java-1.1.10.5
 swagger-annotations-2.2.8
diff --git a/README.md b/README.md
index 6e90cac5c52..0be7bff7afa 100644
--- a/README.md
+++ b/README.md
@@ -11,9 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 
3.7 and removal of b
 see 
[KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223)
 and
 
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510)
 for more details).
 
-Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 
support has been deprecated since
-Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see 
[KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
-for more details). See below for how to use a specific Scala version or all of 
the supported Scala versions.
+Scala 2.13 is the only supported version in Apache Kafka.
 
 ### Build a jar and run it ###
     ./gradlew jar
@@ -122,23 +120,6 @@ Using compiled files:
 ### Cleaning the build ###
     ./gradlew clean
 
-### Running a task with one of the Scala versions available (2.12.x or 2.13.x) 
###
-*Note that if building the jars with a version other than 2.13.x, you need to 
set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to 
run the quick start.*
-
-You can pass either the major version (eg 2.12) or the full version (eg 
2.12.7):
-
-    ./gradlew -PscalaVersion=2.12 jar
-    ./gradlew -PscalaVersion=2.12 test
-    ./gradlew -PscalaVersion=2.12 releaseTarGz
-
-### Running a task with all the scala versions enabled by default ###
-
-Invoke the `gradlewAll` script followed by the task(s):
-
-    ./gradlewAll test
-    ./gradlewAll jar
-    ./gradlewAll releaseTarGz
-
 ### Running a task for a specific project ###
 This is for `core`, `examples` and `clients`
 
@@ -162,24 +143,6 @@ The `eclipse` task has been configured to use 
`${project_dir}/build_eclipse` as
 build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory 
and we don't use Gradle's build directory
 to avoid known issues with this configuration.
 
-### Publishing the jar for all versions of Scala and for all projects to maven 
###
-The recommended command is:
-
-    ./gradlewAll publish
-
-For backwards compatibility, the following also works:
-
-    ./gradlewAll uploadArchives
-
-Please note for this to work you should create/update 
`${GRADLE_USER_HOME}/gradle.properties` (typically, 
`~/.gradle/gradle.properties`) and assign the following variables
-
-    mavenUrl=
-    mavenUsername=
-    mavenPassword=
-    signing.keyId=
-    signing.password=
-    signing.secretKeyRingFile=
-
 ### Publishing the streams quickstart archetype artifact to maven ###
 For the Streams archetype project, one cannot use gradle to upload to maven; 
instead the `mvn deploy` command needs to be called at the quickstart folder:
 
@@ -209,22 +172,10 @@ Please note for this to work you should create/update 
user maven settings (typic
      </servers>
      ...
 
-
-### Installing ALL the jars to the local Maven repository ###
-The recommended command to build for both Scala 2.12 and 2.13 is:
-
-    ./gradlewAll publishToMavenLocal
-
-For backwards compatibility, the following also works:
-
-    ./gradlewAll install
-
 ### Installing specific projects to the local Maven repository ###
 
     ./gradlew -PskipSigning=true :streams:publishToMavenLocal
     
-If needed, you can specify the Scala version with `-PscalaVersion=2.13`.
-
 ### Building the test jar ###
     ./gradlew testJar
 
diff --git a/build.gradle b/build.gradle
index ec55a370a7b..242143794e4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -773,25 +773,11 @@ subprojects {
       scalaCompileOptions.additionalParameters += inlineFrom
     }
 
-    if (versions.baseScala != '2.12') {
-      scalaCompileOptions.additionalParameters += ["-opt-warnings", 
"-Xlint:strict-unsealed-patmat"]
-      // Scala 2.13.2 introduces compiler warnings suppression, which is a 
pre-requisite for -Xfatal-warnings
-      scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
-    }
-
-    // these options are valid for Scala versions < 2.13 only
-    // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 
and https://github.com/scala/scala/pull/5969
-    if (versions.baseScala == '2.12') {
-      scalaCompileOptions.additionalParameters += [
-        "-Xlint:by-name-right-associative",
-        "-Xlint:nullary-override",
-        "-Xlint:unsound-match"
-      ]
-    }
+    scalaCompileOptions.additionalParameters += ["-opt-warnings", 
"-Xlint:strict-unsealed-patmat"]
+    // Scala 2.13.2 introduces compiler warnings suppression, which is a 
pre-requisite for -Xfatal-warnings
+    scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
 
-    // Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13 
doesn't have that restriction
-    if (versions.baseScala == "2.13" || 
JavaVersion.current().isJava9Compatible())
-      scalaCompileOptions.additionalParameters += ["-release", 
String.valueOf(minJavaVersion)]
+    scalaCompileOptions.additionalParameters += ["-release", 
String.valueOf(minJavaVersion)]
 
     addParametersForTests(name, options)
 
@@ -1096,7 +1082,6 @@ project(':core') {
     implementation libs.joptSimple
     implementation libs.jose4j
     implementation libs.metrics
-    implementation libs.scalaCollectionCompat
     implementation libs.scalaJava8Compat
     // only needed transitively, but set it explicitly to ensure it has the 
same version as scala-library
     implementation libs.scalaReflect
@@ -2813,14 +2798,6 @@ project(':streams:streams-scala') {
     api project(':streams')
 
     api libs.scalaLibrary
-    if ( versions.baseScala == '2.12' ) {
-      // Scala-Collection-Compat isn't required when compiling with Scala 2.13 
or later,
-      // and having it in the dependencies could lead to classpath conflicts 
in Scala 3
-      // projects that use kafka-streams-kafka_2.13 (because we don't have a 
Scala 3 version yet)
-      // but also pull in scala-collection-compat_3 via another dependency.
-      // So we make sure to not include it in the dependencies.
-      api libs.scalaCollectionCompat
-    }
     testImplementation project(':group-coordinator')
     testImplementation project(':core')
     testImplementation project(':test-common')
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 6602d879759..4722311f6c0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -628,7 +628,7 @@ object ConfigCommand extends Logging {
 
   private def describeQuotaConfigs(adminClient: Admin, entityTypes: 
List[String], entityNames: List[String]): Unit = {
     val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, 
entityNames)
-    quotaConfigs.forKeyValue { (entity, entries) =>
+    quotaConfigs.foreachEntry { (entity, entries) =>
       val entityEntries = entity.entries.asScala
 
       def entitySubstr(entityType: String): Option[String] =
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 33fec627486..77662c3b114 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -19,7 +19,6 @@ package kafka.admin
 
 import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
 import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
 import kafka.utils.{Logging, ToolsUtils}
 import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, 
ZkSecurityMigratorUtils}
 import org.apache.kafka.common.security.JaasUtils
@@ -130,7 +129,7 @@ object ZkSecurityMigrator extends Logging {
     // Now override any set system properties with explicitly-provided values 
from the config file
     // Emit INFO logs due to camel-case property names encouraging mistakes -- 
help people see mistakes they make
     info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration 
properties in file $filename")
-    zkTlsConfigFileProps.asScala.forKeyValue { (key, value) =>
+    zkTlsConfigFileProps.asScala.foreachEntry { (key, value) =>
       info(s"Setting $key")
       KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
     }
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 27ca3c5e02e..cea7368378d 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,6 @@ package kafka.controller
 import com.yammer.metrics.core.{Gauge, Timer}
 import kafka.cluster.Broker
 import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
 import kafka.utils._
 import org.apache.kafka.clients._
 import org.apache.kafka.common._
@@ -524,11 +523,11 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
       else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1
       else 0
 
-    leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) 
=>
+    leaderAndIsrRequestMap.foreachEntry { (broker, 
leaderAndIsrPartitionStates) =>
       if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) {
         val leaderIds = mutable.Set.empty[Int]
         var numBecomeLeaders = 0
-        leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) =>
+        leaderAndIsrPartitionStates.foreachEntry { (topicPartition, state) =>
           leaderIds += state.leader
           val typeOfRequest = if (broker == state.leader) {
             numBecomeLeaders += 1
@@ -669,10 +668,10 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
         handleStopReplicaResponse(stopReplicaResponse, brokerId, 
partitionErrorsForDeletingTopics.toMap)
     }
 
-    stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) =>
+    stopReplicaRequestMap.foreachEntry { (brokerId, partitionStates) =>
       if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) {
         if (traceEnabled)
-          partitionStates.forKeyValue { (topicPartition, partitionState) =>
+          partitionStates.foreachEntry { (topicPartition, partitionState) =>
             stateChangeLog.trace(s"Sending StopReplica request $partitionState 
to " +
               s"broker $brokerId for partition $topicPartition")
           }
@@ -680,7 +679,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
         val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId)
         if (stopReplicaRequestVersion >= 3) {
           val stopReplicaTopicState = mutable.Map.empty[String, 
StopReplicaTopicState]
-          partitionStates.forKeyValue { (topicPartition, partitionState) =>
+          partitionStates.foreachEntry { (topicPartition, partitionState) =>
             val topicState = 
stopReplicaTopicState.getOrElseUpdate(topicPartition.topic,
               new StopReplicaTopicState().setTopicName(topicPartition.topic))
             topicState.partitionStates().add(partitionState)
@@ -699,7 +698,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
           val topicStatesWithDelete = mutable.Map.empty[String, 
StopReplicaTopicState]
           val topicStatesWithoutDelete = mutable.Map.empty[String, 
StopReplicaTopicState]
 
-          partitionStates.forKeyValue { (topicPartition, partitionState) =>
+          partitionStates.foreachEntry { (topicPartition, partitionState) =>
             val topicStates = if (partitionState.deletePartition()) {
               numPartitionStateWithDelete += 1
               topicStatesWithDelete
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala 
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 2df4e0f78f0..10420716416 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -18,7 +18,6 @@
 package kafka.controller
 
 import kafka.cluster.Broker
-import kafka.utils.Implicits._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderAndIsr
 
@@ -522,7 +521,7 @@ class ControllerContext extends ControllerChannelContext {
   }
 
   private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
-    partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue { 
(partition, replicaAssignment) =>
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).foreachEntry { 
(partition, replicaAssignment) =>
       partitionLeadershipInfo.get(new TopicPartition(topic, 
partition)).foreach { leadershipInfo =>
         if (!hasPreferredLeader(replicaAssignment, leadershipInfo))
           preferredReplicaImbalanceCount -= 1
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index bd4df02ca3b..0f9d65ebb1c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -26,7 +26,6 @@ import kafka.coordinator.transaction.ZkProducerIdManager
 import kafka.server._
 import kafka.server.metadata.ZkFinalizedFeatureCache
 import kafka.utils._
-import kafka.utils.Implicits._
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zk.TopicZNode.TopicIdReplicaAssignment
 import kafka.zk.{FeatureZNodeStatus, _}
@@ -1030,7 +1029,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = 
controllerContext.allPartitions.toSeq): Unit = {
     val leaderIsrAndControllerEpochs = 
zkClient.getTopicPartitionStates(partitions)
-    leaderIsrAndControllerEpochs.forKeyValue { (partition, 
leaderIsrAndControllerEpoch) =>
+    leaderIsrAndControllerEpochs.foreachEntry { (partition, 
leaderIsrAndControllerEpoch) =>
       controllerContext.putPartitionLeadershipInfo(partition, 
leaderIsrAndControllerEpoch)
     }
   }
@@ -1297,7 +1296,7 @@ class KafkaController(val config: KafkaConfig,
       }.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
 
     // for each broker, check if a preferred replica election needs to be 
triggered
-    preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker, 
topicPartitionsForBroker) =>
+    preferredReplicasForTopicsByBrokers.foreachEntry { (leaderBroker, 
topicPartitionsForBroker) =>
       val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case 
(topicPartition, _) =>
         val leadershipInfo = 
controllerContext.partitionLeadershipInfo(topicPartition)
         leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
@@ -1776,7 +1775,7 @@ class KafkaController(val config: KafkaConfig,
       }
     } else if (partitionsToBeAdded.nonEmpty) {
       info(s"New partitions to be added $partitionsToBeAdded")
-      partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
+      partitionsToBeAdded.foreachEntry { (topicPartition, assignedReplicas) =>
         controllerContext.updatePartitionFullReplicaAssignment(topicPartition, 
assignedReplicas)
       }
       onNewPartitionCreation(partitionsToBeAdded.keySet)
@@ -1821,7 +1820,7 @@ class KafkaController(val config: KafkaConfig,
       val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
       val partitionsToReassign = mutable.Map.empty[TopicPartition, 
ReplicaAssignment]
 
-      zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) =>
+      zkClient.getPartitionReassignment.foreachEntry { (tp, targetReplicas) =>
         maybeBuildReassignment(tp, Some(targetReplicas)) match {
           case Some(context) => partitionsToReassign.put(tp, context)
           case None => reassignmentResults.put(tp, new 
ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
@@ -1858,7 +1857,7 @@ class KafkaController(val config: KafkaConfig,
       val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
       val partitionsToReassign = mutable.Map.empty[TopicPartition, 
ReplicaAssignment]
 
-      reassignments.forKeyValue { (tp, targetReplicas) =>
+      reassignments.foreachEntry { (tp, targetReplicas) =>
         val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
         maybeApiError match {
           case None =>
@@ -2304,7 +2303,7 @@ class KafkaController(val config: KafkaConfig,
 
     // After we have returned the result of the `AlterPartition` request, we 
should check whether
     // there are any reassignments which can be completed by a successful ISR 
expansion.
-    partitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
+    partitionResponses.foreachEntry { (topicPartition, partitionResponse) =>
       if 
(controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
         val isSuccessfulUpdate = partitionResponse.isRight
         if (isSuccessfulUpdate) {
@@ -2480,7 +2479,7 @@ class KafkaController(val config: KafkaConfig,
       partitionsToAlter.keySet
     )
 
-    partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, 
partitionResponses) =>
+    partitionResponses.groupBy(_._1.topic).foreachEntry { (topicName, 
partitionResponses) =>
       // Add each topic part to the response
       val topicResponse = if (useTopicsIds) {
         new AlterPartitionResponseData.TopicData()
@@ -2491,7 +2490,7 @@ class KafkaController(val config: KafkaConfig,
       }
       alterPartitionResponse.topics.add(topicResponse)
 
-      partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+      partitionResponses.foreachEntry { (tp, errorOrIsr) =>
         // Add each partition part to the response (new ISR or error)
         errorOrIsr match {
           case Left(error) =>
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 2af5381d9b1..c0b92b9c638 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -19,7 +19,6 @@ package kafka.controller
 import kafka.common.StateChangeFailedException
 import kafka.controller.Election._
 import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
 import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@@ -437,7 +436,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
     val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => 
result.topicPartition -> result.leaderAndIsr.get).toMap
     val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = 
zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
-    finishedUpdates.forKeyValue { (partition, result) =>
+    finishedUpdates.foreachEntry { (partition, result) =>
       result.foreach { leaderAndIsr =>
         val replicaAssignment = 
controllerContext.partitionFullReplicaAssignment(partition)
         val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5a299a469c0..406fff2b51b 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -18,7 +18,6 @@ package kafka.controller
 
 import kafka.common.StateChangeFailedException
 import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
 import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@@ -110,7 +109,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
     if (replicas.nonEmpty) {
       try {
         controllerBrokerRequestBatch.newBatch()
-        replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) =>
+        replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) =>
           doHandleStateChanges(replicaId, replicas, targetState)
         }
         
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@@ -227,7 +226,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
           
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
         }
         val updatedLeaderIsrAndControllerEpochs = 
removeReplicasFromIsr(replicaId, 
replicasWithLeadershipInfo.map(_.topicPartition))
-        updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, 
leaderIsrAndControllerEpoch) =>
+        updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition, 
leaderIsrAndControllerEpoch) =>
           stateLogger.info(s"Partition $partition state changed to 
$leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as 
part of transition to $OfflineReplica")
           if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
             val recipients = 
controllerContext.partitionReplicaAssignment(partition).filterNot(_ == 
replicaId)
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 01756550569..fcf5766b577 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.group
 
 import kafka.common.OffsetAndMetadata
 import kafka.server.{KafkaConfig, ReplicaManager}
-import kafka.utils.Implicits.MapExtensionMethods
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
 import org.apache.kafka.common.metrics.Metrics
@@ -281,7 +280,7 @@ private[group] class GroupCoordinatorAdapter(
     coordinator.handleDeleteGroups(
       groupIds.asScala.toSet,
       new RequestLocal(bufferSupplier)
-    ).forKeyValue { (groupId, error) =>
+    ).foreachEntry { (groupId, error) =>
       results.add(new DeleteGroupsResponseData.DeletableGroupResult()
         .setGroupId(groupId)
         .setErrorCode(error.code))
@@ -338,7 +337,7 @@ private[group] class GroupCoordinatorAdapter(
       val topicsList = new 
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
       val topicsMap = new mutable.HashMap[String, 
OffsetFetchResponseData.OffsetFetchResponseTopics]()
 
-      results.forKeyValue { (tp, offset) =>
+      results.foreachEntry { (tp, offset) =>
         val topic = topicsMap.get(tp.topic) match {
           case Some(topic) =>
             topic
@@ -378,7 +377,7 @@ private[group] class GroupCoordinatorAdapter(
       val response = new OffsetCommitResponseData()
       val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
 
-      commitStatus.forKeyValue { (tp, error) =>
+      commitStatus.foreachEntry { (tp, error) =>
         val topic = byTopics.get(tp.topic) match {
           case Some(existingTopic) =>
             existingTopic
@@ -445,7 +444,7 @@ private[group] class GroupCoordinatorAdapter(
       val response = new TxnOffsetCommitResponseData()
       val byTopics = new mutable.HashMap[String, 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]()
 
-      results.forKeyValue { (tp, error) =>
+      results.foreachEntry { (tp, error) =>
         val topic = byTopics.get(tp.topic) match {
           case Some(existingTopic) =>
             existingTopic
@@ -546,7 +545,7 @@ private[group] class GroupCoordinatorAdapter(
       future.completeExceptionally(groupError.exception)
     } else {
       val response = new OffsetDeleteResponseData()
-      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+      topicPartitionResults.foreachEntry { (topicPartition, error) =>
         var topic = response.topics.find(topicPartition.topic)
         if (topic == null) {
           topic = new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 5bf69e9aa32..7d74512b712 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantLock
 
 import kafka.common.OffsetAndMetadata
 import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
-import kafka.utils.Implicits._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
@@ -651,7 +650,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
 
   def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): 
Unit = {
     receivedConsumerOffsetCommits = true
-    offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+    offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
       pendingOffsetCommits += topicIdPartition.topicPartition -> 
offsetAndMetadata
     }
   }
@@ -662,7 +661,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     val producerOffsets = 
pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
       mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
 
-    offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+    offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
       producerOffsets.put(topicIdPartition.topicPartition, 
CommitRecordMetadataAndOffset(None, offsetAndMetadata))
     }
   }
@@ -708,7 +707,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     val pendingOffsetsOpt = 
pendingTransactionalOffsetCommits.remove(producerId)
     if (isCommit) {
       pendingOffsetsOpt.foreach { pendingOffsets =>
-        pendingOffsets.forKeyValue { (topicPartition, 
commitRecordMetadataAndOffset) =>
+        pendingOffsets.foreachEntry { (topicPartition, 
commitRecordMetadataAndOffset) =>
           if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
             throw new IllegalStateException(s"Trying to complete a 
transactional offset commit for producerId $producerId " +
               s"and groupId $groupId even though the offset commit record 
itself hasn't been appended to the log.")
@@ -746,7 +745,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   def removeOffsets(topicPartitions: Seq[TopicPartition]): 
immutable.Map[TopicPartition, OffsetAndMetadata] = {
     topicPartitions.flatMap { topicPartition =>
       pendingOffsetCommits.remove(topicPartition)
-      pendingTransactionalOffsetCommits.forKeyValue { (_, pendingOffsets) =>
+      pendingTransactionalOffsetCommits.foreachEntry { (_, pendingOffsets) =>
         pendingOffsets.remove(topicPartition)
       }
       val removedOffset = offsets.remove(topicPartition)
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 21bbae6487b..8db98654b94 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -30,7 +30,6 @@ import kafka.common.OffsetAndMetadata
 import 
kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
-import kafka.utils.Implicits._
 import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.compress.Compression
@@ -393,7 +392,7 @@ class GroupMetadataManager(brokerId: Int,
       val responseError = group.inLock {
         if (status.error == Errors.NONE) {
           if (!group.is(Dead)) {
-            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+            filteredOffsetMetadata.foreachEntry { (topicIdPartition, 
offsetAndMetadata) =>
               if (isTxnOffsetCommit)
                 group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
               else
@@ -409,7 +408,7 @@ class GroupMetadataManager(brokerId: Int,
           if (!group.is(Dead)) {
             if (!group.hasPendingOffsetCommitsFromProducer(producerId))
               removeProducerGroup(producerId, group.groupId)
-            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+            filteredOffsetMetadata.foreachEntry { (topicIdPartition, 
offsetAndMetadata) =>
               if (isTxnOffsetCommit)
                 group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
               else
@@ -705,11 +704,11 @@ class GroupMetadataManager(brokerId: Int,
           }.partition { case (group, _) => loadedGroups.contains(group) }
 
         val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, 
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
-        pendingOffsets.forKeyValue { (producerId, producerOffsets) =>
+        pendingOffsets.foreachEntry { (producerId, producerOffsets) =>
           
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
           producerOffsets
             .groupBy(_._1.group)
-            .forKeyValue { (group, offsets) =>
+            .foreachEntry { (group, offsets) =>
               val groupPendingOffsets = 
pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, 
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
               val groupProducerOffsets = 
groupPendingOffsets.getOrElseUpdate(producerId, 
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
               groupProducerOffsets ++= offsets.map { case 
(groupTopicPartition, offset) =>
@@ -878,7 +877,7 @@ class GroupMetadataManager(brokerId: Int,
 
           replicaManager.onlinePartition(appendPartition).foreach { partition 
=>
             val tombstones = ArrayBuffer.empty[SimpleRecord]
-            removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
+            removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) 
=>
               trace(s"Removing expired/deleted offset and metadata for 
$groupId, $topicPartition: $offsetAndMetadata")
               val commitKey = GroupMetadataManager.offsetCommitKey(groupId, 
topicPartition)
               tombstones += new SimpleRecord(timestamp, commitKey, null)
@@ -971,7 +970,7 @@ class GroupMetadataManager(brokerId: Int,
   }
 
   private def removeGroupFromAllProducers(groupId: String): Unit = 
openGroupsForProducer synchronized {
-    openGroupsForProducer.forKeyValue { (_, groups) =>
+    openGroupsForProducer.foreachEntry { (_, groups) =>
       groups.remove(groupId)
     }
   }
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 410d7f044ee..afd645877d1 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -22,7 +22,6 @@ import 
kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR
 import java.util
 import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, 
LinkedBlockingQueue}
 import kafka.server.{KafkaConfig, MetadataCache}
-import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients._
 import org.apache.kafka.common.metrics.Metrics
@@ -147,7 +146,7 @@ class TxnMarkerQueue(@volatile var destination: Node) 
extends Logging {
   }
 
   def forEachTxnTopicPartition[B](f:(Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]) => B): Unit =
-    markersPerTxnTopicPartition.forKeyValue { (partition, queue) =>
+    markersPerTxnTopicPartition.foreachEntry { (partition, queue) =>
       if (!queue.isEmpty) f(partition, queue)
     }
 
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 094c4d950cc..5abd9768767 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.server.{MetadataCache, ReplicaManager}
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool}
-import kafka.utils.Implicits._
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.ListTransactionsResponseData
@@ -237,7 +236,7 @@ class TransactionStateManager(brokerId: Int,
 
   private[transaction] def removeExpiredTransactionalIds(): Unit = {
     inReadLock(stateLock) {
-      transactionMetadataCache.forKeyValue { (partitionId, 
partitionCacheEntry) =>
+      transactionMetadataCache.foreachEntry { (partitionId, 
partitionCacheEntry) =>
         val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
         removeExpiredTransactionalIds(transactionPartition, 
partitionCacheEntry)
       }
@@ -250,7 +249,7 @@ class TransactionStateManager(brokerId: Int,
     tombstoneRecords: MemoryRecords
   ): Unit = {
     def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
-      responses.forKeyValue { (topicPartition, response) =>
+      responses.foreachEntry { (topicPartition, response) =>
         inReadLock(stateLock) {
           transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
             expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
@@ -345,7 +344,7 @@ class TransactionStateManager(brokerId: Int,
         }
 
         val states = new 
java.util.ArrayList[ListTransactionsResponseData.TransactionState]
-        transactionMetadataCache.forKeyValue { (_, cache) =>
+        transactionMetadataCache.foreachEntry { (_, cache) =>
           cache.metadataPerTransactionalId.values.foreach { txnMetadata =>
             txnMetadata.inLock {
               if (shouldInclude(txnMetadata)) {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 176e510c6d0..a0e68364138 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -35,7 +35,6 @@ import scala.jdk.CollectionConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
-import kafka.utils.Implicits._
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.requests.{AbstractControlRequest, 
LeaderAndIsrRequest}
 import org.apache.kafka.image.TopicsImage
@@ -699,7 +698,7 @@ class LogManager(logDirs: Seq[File],
     }
 
     try {
-      jobs.forKeyValue { (dir, dirJobs) =>
+      jobs.foreachEntry { (dir, dirJobs) =>
         if (waitForAllToComplete(dirJobs,
           e => warn(s"There was an error in one of the threads during 
LogManager shutdown: ${e.getCause}"))) {
           val logs = logsInDir(localLogsByDir, dir)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index ca4ab90957b..fc215adb3aa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,6 @@ import com.typesafe.scalalogging.Logger
 import kafka.network
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
-import kafka.utils.Implicits._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.EnvelopeResponseData
@@ -465,7 +464,7 @@ class RequestChannel(val queueSize: Int,
     requestQueue.take()
 
   def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, 
Integer]): Unit = {
-    errors.forKeyValue { (error, count) =>
+    errors.foreachEntry { (error, count) =>
       metrics(apiKey.name).markErrorMeter(error, count)
     }
   }
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala 
b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index dd5df2f2da4..501c6e2ece3 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, 
CompletionStage}
 import com.typesafe.scalalogging.Logger
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils._
-import kafka.utils.Implicits._
 import kafka.zk._
 import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.acl._
@@ -107,7 +106,7 @@ object AclAuthorizer {
       // be sure to force creation since the zkSslClientEnable property in the 
kafkaConfig could be false
       val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = 
true)
       // add in any prefixed overlays
-      ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { 
(kafkaProp, sysProp) =>
+      ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.foreachEntry { 
(kafkaProp, sysProp) =>
         configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { 
prefixedValue =>
           zkClientConfig.setProperty(sysProp,
             if (kafkaProp == 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
@@ -187,7 +186,7 @@ class AclAuthorizer extends Authorizer with Logging {
   override def configure(javaConfigs: util.Map[String, _]): Unit = {
     val configs = javaConfigs.asScala
     val props = new java.util.Properties()
-    configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }
+    configs.foreachEntry { (key, value) => props.put(key, value.toString.trim) 
}
 
     superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
       case str: String if str.nonEmpty => str.split(";").map(s => 
SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
@@ -251,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
 
     if (aclsToCreate.nonEmpty) {
       lock synchronized {
-        aclsToCreate.forKeyValue { (resource, aclsWithIndex) =>
+        aclsToCreate.foreachEntry { (resource, aclsWithIndex) =>
           try {
             updateResourceAcls(resource) { currentAcls =>
               val newAcls = aclsWithIndex.map { case (acl, _) => new 
AclEntry(acl.entry) }
@@ -299,7 +298,7 @@ class AclAuthorizer extends Authorizer with Logging {
         resource -> matchingFilters
       }.toMap.filter(_._2.nonEmpty)
 
-      resourcesToUpdate.forKeyValue { (resource, matchingFilters) =>
+      resourcesToUpdate.foreachEntry { (resource, matchingFilters) =>
         val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding, 
Int]()
         try {
           updateResourceAcls(resource) { currentAcls =>
@@ -334,7 +333,7 @@ class AclAuthorizer extends Authorizer with Logging {
 
   override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
     val aclBindings = new util.ArrayList[AclBinding]()
-    aclCache.forKeyValue { case (resource, versionedAcls) =>
+    aclCache.foreachEntry { case (resource, versionedAcls) =>
       versionedAcls.acls.foreach { acl =>
         val binding = new AclBinding(resource, acl.ace)
         if (filter.matches(binding))
@@ -552,7 +551,7 @@ class AclAuthorizer extends Authorizer with Logging {
     aclCacheSnapshot
       .from(new ResourcePattern(resourceType, resourceName, 
PatternType.PREFIXED))
       .to(new ResourcePattern(resourceType, resourceName.take(1), 
PatternType.PREFIXED))
-      .forKeyValue { (resource, acls) =>
+      .foreachEntry { (resource, acls) =>
         if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls
       }
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index eced213734b..c584c987b01 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.cluster.BrokerEndPoint
-import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.utils.Utils
@@ -66,11 +65,11 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
       val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
-      fetcherThreadMap.forKeyValue { (id, thread) =>
+      fetcherThreadMap.foreachEntry { (id, thread) =>
         val partitionStates = thread.removeAllPartitions()
         if (id.fetcherId >= newSize)
           thread.shutdown()
-        partitionStates.forKeyValue { (topicPartition, currentFetchState) =>
+        partitionStates.foreachEntry { (topicPartition, currentFetchState) =>
             val initialFetchState = 
InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(),
               currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
               initOffset = currentFetchState.fetchOffset)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 119f9d01bae..73c89a94399 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -20,7 +20,6 @@ package kafka.server
 import com.yammer.metrics.core.Meter
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
 import kafka.utils.CoreUtils.inLock
-import kafka.utils.Implicits._
 import kafka.utils.{DelayedItem, Logging, Pool}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
@@ -256,7 +255,7 @@ abstract class AbstractFetcherThread(name: String,
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
-    fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
+    fetchedEpochs.foreachEntry { (tp, leaderEpochOffset) =>
       if (partitionStates.contains(tp)) {
         Errors.forCode(leaderEpochOffset.errorCode) match {
           case Errors.NONE =>
@@ -329,7 +328,7 @@ abstract class AbstractFetcherThread(name: String,
     if (responseData.nonEmpty) {
       // process fetched data
       inLock(partitionMapLock) {
-        responseData.forKeyValue { (topicPartition, partitionData) =>
+        responseData.foreachEntry { (topicPartition, partitionData) =>
           Option(partitionStates.stateValue(topicPartition)).foreach { 
currentFetchState =>
             // It's possible that a partition is removed and re-added or 
truncated when there is a pending fetch request.
             // In this case, we only want to process the fetch response if the 
partition state is ready for fetch and
@@ -508,7 +507,7 @@ abstract class AbstractFetcherThread(name: String,
     try {
       failedPartitions.removeAll(initialFetchStates.keySet)
 
-      initialFetchStates.forKeyValue { (tp, initialFetchState) =>
+      initialFetchStates.foreachEntry { (tp, initialFetchState) =>
         val currentState = partitionStates.stateValue(tp)
         val updatedState = partitionFetchState(tp, initialFetchState, 
currentState)
         partitionStates.updateAndMoveToEnd(tp, updatedState)
diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala 
b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
index 90e8114583b..0b680e034b2 100644
--- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
+++ b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import 
kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName, 
VerificationTimeMsMetricName}
-import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.internals.Topic
@@ -99,7 +98,7 @@ class AddPartitionsToTxnManager(
       callback(topicPartitions.map(tp => tp -> 
Errors.COORDINATOR_NOT_AVAILABLE).toMap)
     } else {
       val topicCollection = new AddPartitionsToTxnTopicCollection()
-      topicPartitions.groupBy(_.topic).forKeyValue { (topic, tps) =>
+      topicPartitions.groupBy(_.topic).foreachEntry { (topic, tps) =>
         topicCollection.add(new AddPartitionsToTxnTopic()
           .setName(topic)
           .setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava))
diff --git a/core/src/main/scala/kafka/server/AuthHelper.scala 
b/core/src/main/scala/kafka/server/AuthHelper.scala
index 6e779ce9007..4d21fb43859 100644
--- a/core/src/main/scala/kafka/server/AuthHelper.scala
+++ b/core/src/main/scala/kafka/server/AuthHelper.scala
@@ -20,7 +20,6 @@ package kafka.server
 import java.lang.{Byte => JByte}
 import java.util.Collections
 import kafka.network.RequestChannel
-import kafka.utils.CoreUtils
 import org.apache.kafka.clients.admin.EndpointType
 import org.apache.kafka.common.acl.AclOperation
 import org.apache.kafka.common.acl.AclOperation.DESCRIBE
@@ -105,7 +104,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
                             logIfDenied: Boolean = true)(resourceName: T => 
String): Set[String] = {
     authorizer match {
       case Some(authZ) =>
-        val resourceNameToCount = 
CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _)
+        val resourceNameToCount = resources.groupMapReduce(resourceName)(_ => 
1)(_ + _)
         val actions = resourceNameToCount.iterator.map { case (resourceName, 
count) =>
           val resource = new ResourcePattern(resourceType, resourceName, 
PatternType.LITERAL)
           new Action(operation, resource, count, logIfAllowed, logIfDenied)
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 77a3874a3dd..8e8c603f1d2 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -24,7 +24,6 @@ import kafka.log.UnifiedLog
 import kafka.network.ConnectionQuotas
 import kafka.server.Constants._
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, 
ZooKeeperInternals}
 import org.apache.kafka.common.config.TopicConfig
@@ -64,7 +63,7 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     // Validate the configurations.
     val configNamesToExclude = excludedConfigs(topic, topicConfig)
     val props = new Properties()
-    topicConfig.asScala.forKeyValue { (key, value) =>
+    topicConfig.asScala.foreachEntry { (key, value) =>
       if (!configNamesToExclude.contains(key)) props.put(key, value)
     }
 
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala 
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 6d25bd1f5c8..0375cc8a072 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,7 +20,6 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
-import kafka.utils.Implicits._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.DeleteRecordsResponseData
 import org.apache.kafka.common.protocol.Errors
@@ -49,7 +48,7 @@ class DelayedDeleteRecords(delayMs: Long,
   extends DelayedOperation(delayMs) {
 
   // first update the acks pending variable according to the error code
-  deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
+  deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
     if (status.responseStatus.errorCode == Errors.NONE.code) {
       // Timeout error state will be cleared when required acks are received
       status.acksPending = true
@@ -70,7 +69,7 @@ class DelayedDeleteRecords(delayMs: Long,
    */
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
-    deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
+    deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
       trace(s"Checking delete records satisfaction for $topicPartition, 
current status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
@@ -106,7 +105,7 @@ class DelayedDeleteRecords(delayMs: Long,
   }
 
   override def onExpiration(): Unit = {
-    deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
+    deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
       if (status.acksPending) {
         DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
       }
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 7a21a86260c..c6470c81358 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.Lock
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
-import kafka.utils.Implicits._
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
@@ -65,7 +64,7 @@ class DelayedProduce(delayMs: Long,
   override lazy val logger: Logger = DelayedProduce.logger
 
   // first update the acks pending variable according to the error code
-  produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
+  produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
     if (status.responseStatus.error == Errors.NONE) {
       // Timeout error state will be cleared when required acks are received
       status.acksPending = true
@@ -90,7 +89,7 @@ class DelayedProduce(delayMs: Long,
    */
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
-    produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
+    produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
       trace(s"Checking produce satisfaction for $topicPartition, current 
status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
@@ -119,7 +118,7 @@ class DelayedProduce(delayMs: Long,
   }
 
   override def onExpiration(): Unit = {
-    produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
+    produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
       if (status.acksPending) {
         debug(s"Expiring produce request for partition $topicPartition with 
status $status")
         DelayedProduceMetrics.recordExpiration(topicPartition)
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
index 0da3cc20764..8fca0226a0b 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import com.yammer.metrics.core.Meter
 import kafka.log.AsyncOffsetReadFutureHolder
-import kafka.utils.Implicits._
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ApiException
@@ -58,7 +57,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
 
   // Mark the status as completed, if there is no async task to track.
   // If there is a task to track, then build the response as REQUEST_TIMED_OUT 
by default.
-  metadata.statusByPartition.forKeyValue { (topicPartition, status) =>
+  metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
     status.completed = status.futureHolderOpt.isEmpty
     if (status.futureHolderOpt.isDefined) {
       status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, 
topicPartition.partition()))
@@ -70,7 +69,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
    * Call-back to execute when a delayed operation gets expired and hence 
forced to complete.
    */
   override def onExpiration(): Unit = {
-    metadata.statusByPartition.forKeyValue { (topicPartition, status) =>
+    metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
       if (!status.completed) {
         debug(s"Expiring list offset request for partition $topicPartition 
with status $status")
         status.futureHolderOpt.foreach(futureHolder => 
futureHolder.jobFuture.cancel(true))
@@ -100,7 +99,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
    */
   override def tryComplete(): Boolean = {
     var completable = true
-    metadata.statusByPartition.forKeyValue { (partition, status) =>
+    metadata.statusByPartition.foreachEntry { (partition, status) =>
       if (!status.completed) {
         status.futureHolderOpt.foreach { futureHolder =>
           if (futureHolder.taskFuture.isDone) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ffc338f7d18..f93f4c82349 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging}
-import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -399,7 +398,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
         props.setProperty(configName, passwordEncoder.encode(new 
Password(value)))
       }
     }
-    configProps.asScala.forKeyValue { (name, value) =>
+    configProps.asScala.foreachEntry { (name, value) =>
       if (isPasswordConfig(name))
         encodePassword(name, value)
     }
@@ -436,7 +435,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       }
     }
 
-    props.asScala.forKeyValue { (name, value) =>
+    props.asScala.foreachEntry { (name, value) =>
       if (isPasswordConfig(name))
         decodePassword(name, value)
     }
@@ -451,7 +450,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     val props = persistentProps.clone().asInstanceOf[Properties]
     if (props.asScala.keySet.exists(isPasswordConfig)) {
       maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach 
{ passwordDecoder =>
-        persistentProps.asScala.forKeyValue { (configName, value) =>
+        persistentProps.asScala.foreachEntry { (configName, value) =>
           if (isPasswordConfig(configName) && value != null) {
             val decoded = try {
               Some(passwordDecoder.decode(value).value)
@@ -545,7 +544,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     * `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
     */
   private def overrideProps(props: mutable.Map[String, String], propsOverride: 
mutable.Map[String, String]): Unit = {
-    propsOverride.forKeyValue { (k, v) =>
+    propsOverride.foreachEntry { (k, v) =>
       // Remove synonyms of `k` to ensure the right precedence is applied. But 
disable `matchListenerOverride`
       // so that base configs corresponding to listener configs are not 
removed. Base configs should not be removed
       // since they may be used by other listeners. It is ok to retain them in 
`props` since base configs cannot be
@@ -916,7 +915,7 @@ class DynamicMetricReporterState(brokerId: Int, config: 
KafkaConfig, metrics: Me
                               updatedConfigs: util.Map[String, _]): Unit = {
     val props = new util.HashMap[String, AnyRef]
     updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
-    propsOverride.forKeyValue((k, v) => props.put(k, v))
+    propsOverride.foreachEntry((k, v) => props.put(k, v))
     val reporters = 
dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, 
classOf[MetricsReporter], props)
 
     // Call notifyMetricsReporters first to satisfy the contract for 
MetricsReporter.contextChange,
@@ -1058,7 +1057,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     newAdvertisedListeners: Map[ListenerName, EndPoint]
   ): Boolean = {
     if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
-    oldAdvertisedListeners.forKeyValue {
+    oldAdvertisedListeners.foreachEntry {
       case (oldListenerName, oldEndpoint) =>
         newAdvertisedListeners.get(oldListenerName) match {
           case None => return true
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2f508ef6dc7..c2f24381258 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory.{QuotaManagers, 
UnboundedQuota}
 import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
 import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
 import kafka.server.share.SharePartitionManager
-import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.CommonClientConfigs
@@ -344,7 +343,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         partitionStates)
       // Clear the coordinator caches in case we were the leader. In the case 
of a reassignment, we
       // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
-      result.forKeyValue { (topicPartition, error) =>
+      result.foreachEntry { (topicPartition, error) =>
         if (error == Errors.NONE) {
           val partitionState = partitionStates(topicPartition)
           if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
@@ -662,7 +661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       var errorInResponse = false
 
       val nodeEndpoints = new mutable.HashMap[Int, Node]
-      mergedResponseStatus.forKeyValue { (topicPartition, status) =>
+      mergedResponseStatus.foreachEntry { (topicPartition, status) =>
         if (status.error != Errors.NONE) {
           errorInResponse = true
           debug("Produce request with correlation id %d from client %s on 
partition %s failed due to %s".format(
@@ -732,7 +731,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
-      processingStats.forKeyValue { (tp, info) =>
+      processingStats.foreachEntry { (tp, info) =>
         updateRecordConversionStats(request, tp, info)
       }
     }
@@ -2208,7 +2207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a DeleteRecordsResponse
     def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, 
DeleteRecordsPartitionResult]): Unit = {
       val mergedResponseStatus = authorizedTopicResponses ++ 
unauthorizedTopicResponses ++ nonExistingTopicResponses
-      mergedResponseStatus.forKeyValue { (topicPartition, status) =>
+      mergedResponseStatus.foreachEntry { (topicPartition, status) =>
         if (status.errorCode != Errors.NONE.code) {
           debug("DeleteRecordsRequest with correlation id %d from client %s on 
partition %s failed due to %s".format(
             request.header.correlationId,
@@ -2485,7 +2484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           entriesPerPartition = controlRecords,
           requestLocal = requestLocal,
           responseCallback = errors => {
-            errors.forKeyValue { (tp, partitionResponse) =>
+            errors.foreachEntry { (tp, partitionResponse) =>
               markerResults.put(tp, partitionResponse.error)
             }
             maybeComplete()
@@ -3328,11 +3327,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         val electionResults = new util.ArrayList[ReplicaElectionResult]()
         adjustedResults
           .groupBy { case (tp, _) => tp.topic }
-          .forKeyValue { (topic, ps) =>
+          .foreachEntry { (topic, ps) =>
             val electionResult = new ReplicaElectionResult()
 
             electionResult.setTopic(topic)
-            ps.forKeyValue { (topicPartition, error) =>
+            ps.foreachEntry { (topicPartition, error) =>
               val partitionResult = new PartitionResult()
               partitionResult.setPartitionId(topicPartition.partition)
               partitionResult.setErrorCode(error.error.code)
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index edf514cb77e..3cdff49d408 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -21,7 +21,6 @@ import kafka.cluster.BrokerEndPoint
 
 import java.util.{Collections, Optional}
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
-import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.Logging
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -141,7 +140,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
     }
 
     val topics = new OffsetForLeaderTopicCollection(partitions.size)
-    partitions.forKeyValue { (topicPartition, epochData) =>
+    partitions.foreachEntry { (topicPartition, epochData) =>
       var topic = topics.find(topicPartition.topic)
       if (topic == null) {
         topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
@@ -182,7 +181,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
     val partitionsWithError = mutable.Set[TopicPartition]()
 
     val builder = fetchSessionHandler.newBuilder(partitions.size, false)
-    partitions.forKeyValue { (topicPartition, fetchState) =>
+    partitions.foreachEntry { (topicPartition, fetchState) =>
       // We will not include a replica in the fetch request if it should be 
throttled.
       if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, 
fetchState, topicPartition)) {
         try {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 58873cad312..65da75c8865 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,6 @@ import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, 
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, 
IsrShrinksPerSecMetricName, LeaderCountMetricName, 
OfflineReplicaCountMetricName, PartitionCountMetricName, 
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, 
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, 
UnderReplicatedPartitionsMetricName, createLogReadResult, 
isListOffsetsTimestampUnsupported}
 import kafka.server.metadata.ZkMetadataCache
-import kafka.utils.Implicits._
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors._
@@ -436,7 +435,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     // First, stop the partitions. This will shutdown the fetchers and other 
managers
     val partitionsToStop = strayPartitions.map(tp => StopPartition(tp, 
deleteLocalLog = false)).toSet
-    stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) 
=>
+    stopPartitions(partitionsToStop).foreachEntry { (topicPartition, 
exception) =>
       error(s"Unable to stop stray partition $topicPartition", exception)
     }
 
@@ -512,7 +511,7 @@ class ReplicaManager(val config: KafkaConfig,
       stateChangeLogger.info(s"Handling StopReplica request correlationId 
$correlationId from controller " +
         s"$controllerId for ${partitionStates.size} partitions")
       if (stateChangeLogger.isTraceEnabled)
-        partitionStates.forKeyValue { (topicPartition, partitionState) =>
+        partitionStates.foreachEntry { (topicPartition, partitionState) =>
           stateChangeLogger.trace(s"Received StopReplica request 
$partitionState " +
             s"correlation id $correlationId from controller $controllerId " +
             s"epoch $controllerEpoch for partition $topicPartition")
@@ -529,7 +528,7 @@ class ReplicaManager(val config: KafkaConfig,
         this.controllerEpoch = controllerEpoch
 
         val stoppedPartitions = mutable.Buffer.empty[StopPartition]
-        partitionStates.forKeyValue { (topicPartition, partitionState) =>
+        partitionStates.foreachEntry { (topicPartition, partitionState) =>
           val deletePartition = partitionState.deletePartition()
 
           getPartition(topicPartition) match {
@@ -836,7 +835,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
     val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
-    entriesPerPartition.forKeyValue { (topicPartition, records) =>
+    entriesPerPartition.foreachEntry { (topicPartition, records) =>
       // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
       val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
       transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
@@ -2203,14 +2202,14 @@ class ReplicaManager(val config: KafkaConfig,
 
           val data = new 
LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code)
           if (leaderAndIsrRequest.version < 5) {
-            responseMap.forKeyValue { (tp, error) =>
+            responseMap.foreachEntry { (tp, error) =>
               data.partitionErrors.add(new LeaderAndIsrPartitionError()
                 .setTopicName(tp.topic)
                 .setPartitionIndex(tp.partition)
                 .setErrorCode(error.code))
             }
           } else {
-            responseMap.forKeyValue { (tp, error) =>
+            responseMap.foreachEntry { (tp, error) =>
               val topicId = topicIds.get(tp.topic)
               var topic = data.topics.find(topicId)
               if (topic == null) {
@@ -2335,7 +2334,7 @@ class ReplicaManager(val config: KafkaConfig,
         s"controller $controllerId epoch $controllerEpoch as part of the 
become-leader transition for " +
         s"${partitionStates.size} partitions")
       // Update the partition information to be the leader
-      partitionStates.forKeyValue { (partition, partitionState) =>
+      partitionStates.foreachEntry { (partition, partitionState) =>
         try {
           if (partition.makeLeader(partitionState, highWatermarkCheckpoints, 
topicIds(partitionState.topicName))) {
             partitionsToMakeLeaders += partition
@@ -2400,7 +2399,7 @@ class ReplicaManager(val config: KafkaConfig,
                             highWatermarkCheckpoints: OffsetCheckpoints,
                             topicIds: String => Option[Uuid]) : Set[Partition] 
= {
     val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
-    partitionStates.forKeyValue { (partition, partitionState) =>
+    partitionStates.foreachEntry { (partition, partitionState) =>
       if (traceLoggingEnabled)
         stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
           s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
@@ -2410,7 +2409,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
     try {
-      partitionStates.forKeyValue { (partition, partitionState) =>
+      partitionStates.foreachEntry { (partition, partitionState) =>
         val newLeaderBrokerId = partitionState.leader
         try {
           if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
@@ -2868,7 +2867,7 @@ class ReplicaManager(val config: KafkaConfig,
           }
           .toSet
         stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
-        stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
+        stopPartitions(deletes).foreachEntry { (topicPartition, e) =>
           if (e.isInstanceOf[KafkaStorageException]) {
             stateChangeLogger.error(s"Unable to delete replica $topicPartition 
because " +
               "the local replica for the partition is in an offline log 
directory")
@@ -2918,7 +2917,7 @@ class ReplicaManager(val config: KafkaConfig,
     stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) 
to " +
       "local leaders.")
     replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
-    localLeaders.forKeyValue { (tp, info) =>
+    localLeaders.foreachEntry { (tp, info) =>
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
         try {
           val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
@@ -2953,7 +2952,7 @@ class ReplicaManager(val config: KafkaConfig,
     val partitionsToStartFetching = new mutable.HashMap[TopicPartition, 
Partition]
     val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
     val followerTopicSet = new mutable.HashSet[String]
-    localFollowers.forKeyValue { (tp, info) =>
+    localFollowers.foreachEntry { (tp, info) =>
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
         try {
           followerTopicSet.add(tp.topic)
@@ -3006,7 +3005,7 @@ class ReplicaManager(val config: KafkaConfig,
       val listenerName = config.interBrokerListenerName.value
       val partitionAndOffsets = new mutable.HashMap[TopicPartition, 
InitialFetchState]
 
-      partitionsToStartFetching.forKeyValue { (topicPartition, partition) =>
+      partitionsToStartFetching.foreachEntry { (topicPartition, partition) =>
         val nodeOpt = partition.leaderReplicaIdOpt
           .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
           .flatMap(_.node(listenerName).asScala)
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 748a0776462..238c7304ad4 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, 
toLoggableProps}
 import kafka.server.metadata.ZkConfigRepository
 import kafka.utils._
-import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
@@ -963,7 +962,7 @@ class ZkAdminManager(val config: KafkaConfig,
         }
       ).toMap
 
-    illegalRequestsByUser.forKeyValue { (user, errorMessage) =>
+    illegalRequestsByUser.foreachEntry { (user, errorMessage) =>
       retval.results.add(new AlterUserScramCredentialsResult().setUser(user)
         .setErrorCode(if (errorMessage == unknownScramMechanismMsg) 
{Errors.UNSUPPORTED_SASL_MECHANISM.code} else 
{Errors.UNACCEPTABLE_CREDENTIAL.code})
         .setErrorMessage(errorMessage)) }
@@ -1028,7 +1027,7 @@ class ZkAdminManager(val config: KafkaConfig,
     }).collect { case (user: String, exception: Exception) => (user, 
exception) }.toMap
 
     // report failures
-    usersFailedToPrepareProperties.++(usersFailedToPersist).forKeyValue { 
(user, exception) =>
+    usersFailedToPrepareProperties.++(usersFailedToPersist).foreachEntry { 
(user, exception) =>
       val error = Errors.forException(exception)
       retval.results.add(new AlterUserScramCredentialsResult()
         .setUser(user)
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 7297b87c593..ca41ebda7f3 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -27,7 +27,6 @@ import kafka.controller.StateChangeLogger
 import kafka.server.{BrokerFeatures, CachedControllerId, 
KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
-import kafka.utils.Implicits._
 import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState,
 UpdateMetadataTopicState}
@@ -74,7 +73,7 @@ object ZkMetadataCache {
     val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
     requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), 
state))
     val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
-    currentMetadata.topicNames.forKeyValue((id, name) => {
+    currentMetadata.topicNames.foreachEntry((id, name) => {
       try {
         Option(topicIdToNewState.get(id)) match {
           case None =>
@@ -560,7 +559,7 @@ class ZkMetadataCache(
       } else {
         //since kafka may do partial metadata updates, we start by copying the 
previous state
         val partitionStates = new mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
-        metadataSnapshot.partitionStates.forKeyValue { (topic, 
oldPartitionStates) =>
+        metadataSnapshot.partitionStates.foreachEntry { (topic, 
oldPartitionStates) =>
           val copy = new 
mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
           copy ++= oldPartitionStates
           partitionStates(topic) = copy
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 57f84e39825..d4c49251232 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -23,7 +23,6 @@ import java.io._
 import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, 
ObjectNode, TextNode}
 import kafka.coordinator.transaction.TransactionLog
 import kafka.log._
-import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, VerifiableProperties}
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.message.ConsumerProtocolAssignment
@@ -90,7 +89,7 @@ object DumpLogSegments {
       }
     }
 
-    misMatchesForIndexFilesMap.forKeyValue { (fileName, listOfMismatches) =>
+    misMatchesForIndexFilesMap.foreachEntry { (fileName, listOfMismatches) =>
       System.err.println(s"Mismatches in :$fileName")
       listOfMismatches.foreach { case (indexOffset, logOffset) =>
         System.err.println(s"  Index offset: $indexOffset, log offset: 
$logOffset")
@@ -99,7 +98,7 @@ object DumpLogSegments {
 
     timeIndexDumpErrors.printErrors()
 
-    nonConsecutivePairsForLogFilesMap.forKeyValue { (fileName, 
listOfNonConsecutivePairs) =>
+    nonConsecutivePairsForLogFilesMap.foreachEntry { (fileName, 
listOfNonConsecutivePairs) =>
       System.err.println(s"Non-consecutive offsets in $fileName")
       listOfNonConsecutivePairs.foreach { case (first, second) =>
         System.err.println(s"  $first is followed by $second")
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 710bbddbd50..29c89c29898 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -36,7 +36,6 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.slf4j.event.Level
 
 import java.util
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -260,13 +259,6 @@ object CoreUtils {
     }
   }
 
-  @nowarn("cat=unused") // see below for explanation
-  def groupMapReduce[T, K, B](elements: Iterable[T])(key: T => K)(f: T => 
B)(reduce: (B, B) => B): Map[K, B] = {
-    // required for Scala 2.12 compatibility, unused in Scala 2.13 and hence 
we need to suppress the unused warning
-    import scala.collection.compat._
-    elements.groupMapReduce(key)(f)(reduce)
-  }
-
   def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, 
util.List[Integer]]): Map[Int, Seq[Int]] = {
     map.asScala.map(e => (e._1.asInstanceOf[Int], 
e._2.asScala.map(_.asInstanceOf[Int])))
   }
diff --git a/core/src/main/scala/kafka/utils/Implicits.scala 
b/core/src/main/scala/kafka/utils/Implicits.scala
index fbd22ec6b8c..80335992051 100644
--- a/core/src/main/scala/kafka/utils/Implicits.scala
+++ b/core/src/main/scala/kafka/utils/Implicits.scala
@@ -20,7 +20,6 @@ package kafka.utils
 import java.util
 import java.util.Properties
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -46,22 +45,4 @@ object Implicits {
       (properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava)
 
   }
-
-  /**
-   * Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and 
`foreach` in Scala 2.12
-   * (with the help of scala.collection.compat). `foreachEntry` avoids the 
tuple allocation and
-   * is more efficient.
-   *
-   * This was not named `foreachEntry` to avoid `unused import` warnings in 
Scala 2.13 (the implicit
-   * would not be triggered in Scala 2.13 since `Map.foreachEntry` would have 
precedence).
-   */
-  @nowarn("cat=unused-imports")
-  implicit class MapExtensionMethods[K, V](private val self: 
scala.collection.Map[K, V]) extends AnyVal {
-    import scala.collection.compat._
-    def forKeyValue[U](f: (K, V) => U): Unit = {
-      self.foreachEntry { (k, v) => f(k, v) }
-    }
-
-  }
-
 }
diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala 
b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
index 71bfd61cee1..9c7bec0bdd1 100644
--- a/core/src/main/scala/kafka/utils/json/DecodeJson.scala
+++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
@@ -17,10 +17,8 @@
 
 package kafka.utils.json
 
-import scala.collection.{Map, Seq}
-import scala.collection.compat._
+import scala.collection.{Factory, Map, Seq}
 import scala.jdk.CollectionConverters._
-
 import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
 
 /**
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 15c95b998b6..290802c5d11 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.controller.ReplicaAssignment
 import kafka.server.{DynamicConfig, KafkaConfig}
 import kafka.utils._
-import kafka.utils.Implicits._
 import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
@@ -311,7 +310,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
                                         expectedReplicationFactor: Int,
                                         availableBrokerIds: Set[Int]): Unit = {
 
-    replicaAssignment.forKeyValue { (partitionId, replicas) =>
+    replicaAssignment.foreachEntry { (partitionId, replicas) =>
       if (replicas.isEmpty)
         throw new InvalidReplicaAssignmentException(
           s"Cannot have replication factor of 0 for partition id 
$partitionId.")
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 3535374b6e0..5ccfa525a6d 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -79,8 +79,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
 import scala.collection.Seq;
+import scala.jdk.javaapi.CollectionConverters;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1004,7 +1004,6 @@ public class ConfigCommandTest {
         });
     }
 
-    @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void shouldAlterTopicConfig(boolean file) {
@@ -1013,7 +1012,7 @@ public class ConfigCommandTest {
         addedConfigs.put("delete.retention.ms", "1000000");
         addedConfigs.put("min.insync.replicas", "2");
         if (file) {
-            File f = 
kafka.utils.TestUtils.tempPropertiesFile(JavaConverters.mapAsScalaMap(addedConfigs));
+            File f = 
kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs));
             filePath = f.getPath();
         }
 
@@ -2292,8 +2291,7 @@ public class ConfigCommandTest {
         }
     }
 
-    @SuppressWarnings({"deprecation"})
     private <T> Seq<T> seq(Collection<T> seq) {
-        return 
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+        return CollectionConverters.asScala(seq).toSeq();
     }
 }
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 26ebed6bd86..ad4439faf54 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
-import java.util
-import java.util.Properties
+import java.util.{Properties, stream}
 import java.util.concurrent.atomic.AtomicInteger
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -117,38 +116,35 @@ object BaseConsumerTest {
   // * KRaft and the classic group protocol
   // * KRaft and the consumer group protocol
   def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
-    util.Arrays.stream(Array(
-        Arguments.of("zk", "classic"),
-        Arguments.of("kraft", "classic"),
-        Arguments.of("kraft", "consumer")
-    ))
+    stream.Stream.of(
+      Arguments.of("zk", "classic"),
+      Arguments.of("kraft", "classic"),
+      Arguments.of("kraft", "consumer")
+    )
   }
 
-  // In Scala 2.12, it is necessary to disambiguate the 
java.util.stream.Stream.of() method call
-  // in the case where there's only a single Arguments in the list. The 
following commented-out
-  // method works in Scala 2.13, but not 2.12. For this reason, tests which 
run against just a
-  // single combination are written using @CsvSource rather than the more 
elegant @MethodSource. 
-  // def getTestQuorumAndGroupProtocolParametersZkOnly() : 
java.util.stream.Stream[Arguments] = {
-  //   java.util.stream.Stream.of(
-  //       Arguments.of("zk", "classic"))
-  // }
+  def getTestQuorumAndGroupProtocolParametersZkOnly() : 
java.util.stream.Stream[Arguments] = {
+    stream.Stream.of(
+      Arguments.of("zk", "classic")
+    )
+  }
 
   // For tests that only work with the classic group protocol, we want to test 
the following combinations:
   // * ZooKeeper and the classic group protocol
   // * KRaft and the classic group protocol
   def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
-    util.Arrays.stream(Array(
-        Arguments.of("zk", "classic"),
-        Arguments.of("kraft", "classic")
-    ))
+    stream.Stream.of(
+      Arguments.of("zk", "classic"),
+      Arguments.of("kraft", "classic")
+    )
   }
 
   // For tests that only work with the consumer group protocol, we want to 
test the following combination:
   // * KRaft and the consumer group protocol
-  def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): 
java.util.stream.Stream[Arguments] = {
-    util.Arrays.stream(Array(
-        Arguments.of("kraft", "consumer")
-    ))
+  def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): 
stream.Stream[Arguments] = {
+    stream.Stream.of(
+      Arguments.of("kraft", "consumer")
+    )
   }
 
   val updateProducerCount = new AtomicInteger()
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
index 219ed9c2a1e..b4bc6f60a8e 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
@@ -25,7 +25,6 @@ import org.apache.kafka.common.PartitionInfo
 import java.util.stream.Stream
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
-import org.junit.jupiter.params.provider.CsvSource
 
 /**
  * Integration tests for the consumer that covers logic related to manual 
assignment.
@@ -137,9 +136,7 @@ class PlaintextConsumerAssignTest extends 
AbstractConsumerTest {
 
   // partitionsFor not implemented in consumer group protocol and this test 
requires ZK also
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  @CsvSource(Array(
-    "zk, classic"
-  ))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersZkOnly"))
   def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, 
groupProtocol: String): Unit = {
     val numRecords = 10
     val producer = createProducer()
@@ -243,4 +240,7 @@ class PlaintextConsumerAssignTest extends 
AbstractConsumerTest {
 object PlaintextConsumerAssignTest {
   def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
     BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+
+  def getTestQuorumAndGroupProtocolParametersZkOnly: Stream[Arguments] =
+    BaseConsumerTest.getTestQuorumAndGroupProtocolParametersZkOnly()
 }
diff --git 
a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala 
b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 7e357eb6e2b..62733e5a6db 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
 import java.util.{Collections, Optional, Properties}
 import kafka.network.SocketServer
 import kafka.security.JaasTestUtils
-import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.{Admin, NewTopic}
 import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.protocol.ApiKeys
@@ -120,7 +119,7 @@ object IntegrationTestUtils {
     replicaAssignment: Map[Int, Seq[Int]]
   ): Unit = {
     val javaAssignment = new java.util.HashMap[Integer, 
java.util.List[Integer]]()
-    replicaAssignment.forKeyValue { (partitionId, assignment) =>
+    replicaAssignment.foreachEntry { (partitionId, assignment) =>
       javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
     }
     val newTopic = new NewTopic(topic, javaAssignment)
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala 
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 66971c426f7..97a4b5d7019 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -24,7 +24,6 @@ import java.util.{Collections, Properties}
 import com.yammer.metrics.core.Meter
 import kafka.network.Processor.ListenerMetricTag
 import kafka.server.KafkaConfig
-import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.internals.MetricsUtils
@@ -760,7 +759,7 @@ class ConnectionQuotasTest {
       "Expected broker-connection-accept-rate metric to exist")
 
     // add listeners and verify connection limits not exceeded
-    listeners.forKeyValue { (name, listener) =>
+    listeners.foreachEntry { (name, listener) =>
       val listenerName = listener.listenerName
       connectionQuotas.addListener(config, listenerName)
       
connectionQuotas.maxConnectionsPerListener(listenerName).configure(listenerConfig)
@@ -785,14 +784,14 @@ class ConnectionQuotasTest {
   }
 
   private def verifyNoBlockedPercentRecordedOnAllListeners(): Unit = {
-    blockedPercentMeters.forKeyValue { (name, meter) =>
+    blockedPercentMeters.foreachEntry { (name, meter) =>
       assertEquals(0, meter.count(),
         s"BlockedPercentMeter metric for $name listener")
     }
   }
 
   private def verifyNonZeroBlockedPercentAndThrottleTimeOnAllListeners(): Unit 
= {
-    blockedPercentMeters.forKeyValue { (name, meter) =>
+    blockedPercentMeters.foreachEntry { (name, meter) =>
       assertTrue(meter.count() > 0,
         s"Expected BlockedPercentMeter metric for $name listener to be 
recorded")
     }
@@ -808,7 +807,7 @@ class ConnectionQuotasTest {
   }
 
   private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit 
= {
-    blockedPercentMeters.forKeyValue { (name, meter) =>
+    blockedPercentMeters.foreachEntry { (name, meter) =>
       name match {
         case "REPLICATION" =>
           assertEquals(0, meter.count(), s"BlockedPercentMeter metric for 
$name listener")
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 45086961342..b3f514f9629 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import com.yammer.metrics.core.Gauge
 import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
-import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.TestUtils
 import org.apache.kafka.common.message.FetchResponseData.PartitionData
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -255,7 +254,7 @@ class AbstractFetcherManagerTest {
       fetcherManager.resizeThreadPool(newFetcherSize)
 
       val ownedPartitions = mutable.Set.empty[TopicPartition]
-      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+      fetcherManager.fetcherThreadMap.foreachEntry { (brokerIdAndFetcherId, 
fetcherThread) =>
         val fetcherId = brokerIdAndFetcherId.fetcherId
         val brokerId = brokerIdAndFetcherId.brokerId
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
index 3f27059ce8b..20d9b70a1ae 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import com.yammer.metrics.core.{Histogram, Meter}
-import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.{ClientResponse, NetworkClient}
 import org.apache.kafka.common.errors.{AuthenticationException, 
SaslAuthenticationException, UnsupportedVersionException}
@@ -92,7 +91,7 @@ class AddPartitionsToTxnManagerTest {
   }
 
   private def setErrors(errors: mutable.Map[TopicPartition, 
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
-    callbackErrors.forKeyValue(errors.put)
+    callbackErrors.foreachEntry(errors.put)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala 
b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
index d2d16718e6f..0d63f872e9d 100644
--- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
+++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
@@ -20,7 +20,6 @@ package kafka.server
 import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread.ReplicaFetch
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.utils.Implicits.MapExtensionMethods
 import org.apache.kafka.common.message.FetchResponseData
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -139,7 +138,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new 
BrokerEndPoint(1, ho
 
   override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
     val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
-    partitions.forKeyValue { (partition, epochData) =>
+    partitions.foreachEntry { (partition, epochData) =>
       assert(partition.partition == epochData.partition,
         "Partition must be consistent between TopicPartition and EpochData")
       val leaderState = leaderPartitionState(partition)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 81fc5ca422b..661a638aa88 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -1314,7 +1314,6 @@ class ReplicaFetcherThreadTest {
     when(partition.localLogOrException).thenReturn(log)
     when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], 
any[Boolean])).thenReturn(appendInfo)
 
-    // In Scala 2.12, the partitionsWithNewHighWatermark buffer is cleared 
before the replicaManager mock is verified.
     // Capture the argument at the time of invocation.
     val completeDelayedFetchRequestsArgument = 
mutable.Buffer.empty[TopicPartition]
     val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb73a53c3d2..0bb1c9661cb 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -19,7 +19,6 @@ package kafka.server.epoch
 import kafka.cluster.BrokerEndPoint
 import kafka.server.KafkaConfig._
 import kafka.server._
-import kafka.utils.Implicits._
 import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -311,7 +310,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
 
     def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {
       val topics = new OffsetForLeaderTopicCollection(partitions.size)
-      partitions.forKeyValue { (topicPartition, leaderEpoch) =>
+      partitions.foreachEntry { (topicPartition, leaderEpoch) =>
         var topic = topics.find(topicPartition.topic)
         if (topic == null) {
           topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 326bb1ed918..15ed5c2c17d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -421,7 +421,7 @@ object TestUtils extends Logging {
         topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
     } else {
       val assignment = new util.HashMap[Integer, util.List[Integer]]()
-      replicaAssignment.forKeyValue { case (k, v) =>
+      replicaAssignment.foreachEntry { case (k, v) =>
         val replicas = new util.ArrayList[Integer]
         v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
         assignment.put(k.asInstanceOf[Integer], replicas)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 96fb97fda8f..4c81bf00d46 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -86,6 +86,10 @@
         </li>
         <li>Other changes:
             <ul>
+                <li>
+                    Scala 2.12 support has been removed in Apache Kafka 4.0
+                    See <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218";>KIP-751</a>
 for more details
+                </li>
                 <li>The <code>--delete-config</code> option in the 
<code>kafka-topics</code> command line tool has been deprecated.
                 </li>
             </ul>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 8bb417a7362..45248cc7e28 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -20,29 +20,22 @@
 ext {
   versions = [:]
   libs = [:]
-
-  // Available if -PscalaVersion is used. This is useful when we want to 
support a Scala version that has
-  // a higher minimum Java requirement than Kafka. This was previously the 
case for Scala 2.12 and Java 7.
-  availableScalaVersions = [ '2.12', '2.13' ]
 }
 
 // Add Scala version
-def defaultScala212Version = '2.12.19'
 def defaultScala213Version = '2.13.15'
 if (hasProperty('scalaVersion')) {
-  if (scalaVersion == '2.12') {
-    versions["scala"] = defaultScala212Version
-  } else if (scalaVersion == '2.13') {
+  if (scalaVersion == '2.13') {
     versions["scala"] = defaultScala213Version
   }  else {
     versions["scala"] = scalaVersion
   }
 } else {
-  versions["scala"] = defaultScala212Version
+  versions["scala"] = defaultScala213Version
 }
 
 /* Resolve base Scala version according to these patterns:
- 1. generally available Scala versions (such as: 2.12.y and 2.13.z) 
corresponding base versions will be: 2.12 and 2.13 (respectively)
+ 1. generally available Scala versions (such as: 2.13.z) corresponding base 
versions will be: 2.13 (respectively)
  2. pre-release Scala versions (i.e. milestone/rc, such as: 2.13.0-M5, 
2.13.0-RC1, 2.14.0-M1, etc.) will have identical base versions;
     rationale: pre-release Scala versions are not binary compatible with each 
other and that's the reason why libraries include the full
     Scala release string in their name for pre-releases (see dependencies 
below with an artifact name suffix '_$versions.baseScala')
@@ -54,13 +47,9 @@ if ( !versions.scala.contains('-') ) {
 }
 
 // mockito >= 5.5 is required for Java 21 and mockito 5.x requires at least 
Java 11
-// mockito 4.9 is required for Scala 2.12 as a workaround for compiler errors 
due to ambiguous reference to `Mockito.spy`
-// since Scala 2.12 support is going away soon, this is simpler than adjusting 
the code.
 // mockito 4.11 is used with Java 8 and Scala 2.13
 String mockitoVersion
-if (scalaVersion == "2.12")
-  mockitoVersion = "4.9.0"
-else if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11))
+if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11))
   mockitoVersion = "5.10.0"
 else
   mockitoVersion = "4.11.0"
@@ -146,7 +135,6 @@ versions += [
   pcollections: "4.0.1",
   reload4j: "1.2.25",
   rocksDB: "7.9.2",
-  scalaCollectionCompat: "2.10.0",
   // When updating the scalafmt version please also update the version field 
in checkstyle/.scalafmt.conf. scalafmt now
   // has the version field as mandatory in its configuration, see
   // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
@@ -246,7 +234,6 @@ libs += [
   opentelemetryProto: 
"io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
   reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j",
   rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
-  scalaCollectionCompat: 
"org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",
   scalaJava8Compat: 
"org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
   scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
   scalaLogging: 
"com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index bf540c4d7aa..e38ed4e21bf 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -184,68 +184,12 @@ For a detailed description of spotbugs bug categories, 
see https://spotbugs.read
         <Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED"/>
     </Match>
 
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="AdminUtils.scala"/>
-        <Package name="kafka.admin"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="ControllerContext.scala"/>
-        <Package name="kafka.controller"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
     <Match>
         <!-- offsets is a lazy val and it confuses spotBugs with its locking 
scheme -->
         <Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/>
         <Bug pattern="IS2_INCONSISTENT_SYNC"/>
     </Match>
 
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="ReplicaManager.scala"/>
-        <Package name="kafka.server"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="LogManager.scala"/>
-        <Package name="kafka.log"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="DelayedElectLeader.scala"/>
-        <Package name="kafka.server"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="AdminZkClient.scala"/>
-        <Package name="kafka.zk"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="AuthHelper.scala"/>
-        <Package name="kafka.server"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
-    <Match>
-        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
-        <Source name="KafkaApis.scala"/>
-        <Package name="kafka.server"/>
-        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
-    </Match>
-
     <Match>
         <!-- Keeping this class for compatibility. It's deprecated and will be 
removed in the next major release -->
         <Source name="MessageFormatter.scala"/>
diff --git a/gradlewAll b/gradlewAll
index c09e9b4a485..21d6d7b7525 100755
--- a/gradlewAll
+++ b/gradlewAll
@@ -17,5 +17,9 @@
 
 # Convenient way to invoke a gradle command with all Scala versions supported
 # by default
-./gradlew "$@" -PscalaVersion=2.12 && ./gradlew "$@" -PscalaVersion=2.13
+# This script was originally designed to support multiple Scala versions (2.12 
and 2.13),
+# but as Scala 2.12 is no longer supported, this script is no longer necessary.
+# We are keeping it for backwards compatibility. It will be removed in a 
future release.
+echo "Warning: This script is deprecated and will be removed in a future 
release."
+./gradlew "$@" -PscalaVersion=2.13
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
index 776ca9b5816..7d7716cae2e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
@@ -189,4 +189,23 @@ public class ConcurrentMapBenchmark {
             }
         }
     }
+
+    @Benchmark
+    @OperationsPerInvocation(TIMES)
+    public void testConcurrentHashMapComputeIfAbsentReadOnly(Blackhole 
blackhole) {
+        for (int i = 0; i < TIMES; i++) {
+            blackhole.consume(concurrentHashMap.computeIfAbsent(
+                    ThreadLocalRandom.current().nextInt(0, mapSize),
+                    newValue -> Integer.MAX_VALUE
+            ));
+        }
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(TIMES)
+    public void testConcurrentHashMapGetReadOnly(Blackhole blackhole) {
+        for (int i = 0; i < TIMES; i++) {
+            
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize)));
+        }
+    }
 }
diff --git a/release/release.py b/release/release.py
index 7710789bba8..67fc46c831d 100644
--- a/release/release.py
+++ b/release/release.py
@@ -326,7 +326,7 @@ except Exception as e:
 
 
 git.targz(rc_tag, f"kafka-{release_version}-src/", 
f"{artifacts_dir}/kafka-{release_version}-src.tgz")
-cmd("Building artifacts", "./gradlew clean && ./gradlewAll releaseTarGz", 
cwd=kafka_dir, env=jdk8_env, shell=True)
+cmd("Building artifacts", "./gradlew clean && ./gradlew releaseTarGz 
-PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True)
 cmd("Copying artifacts", f"cp {kafka_dir}/core/build/distributions/* 
{artifacts_dir}", shell=True)
 cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, 
env=jdk17_env)
 cmd("Copying docs", f"cp -R {kafka_dir}/build/docs/javadoc {artifacts_dir}")
@@ -351,7 +351,7 @@ cmd("Zipping artifacts", f"tar -czf {artifact_name}.tar.gz 
{artifact_name}", cwd
 sftp.upload_artifacts(apache_id, artifacts_dir)
 
 confirm_or_fail("Going to build and upload mvn artifacts based on these 
settings:\n" + textfiles.read(global_gradle_props) + '\nOK?')
-cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir, 
env=jdk8_env, shell=True)
+cmd("Building and uploading archives", "./gradlew publish 
-PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True)
 cmd("Building and uploading archives", "mvn deploy -Pgpg-signing", 
cwd=os.path.join(kafka_dir, "streams/quickstart"), env=jdk8_env, shell=True)
 
 # TODO: Many of these suggested validation steps could be automated
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index b9cbb9f3a76..08b12ff442a 100644
--- 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -77,7 +77,6 @@ import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LIS
 import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
 
 
-@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
 public class KafkaClusterTestKit implements AutoCloseable {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
 

Reply via email to