This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0e4eebe9c00 KAFKA-12895 Drop support for Scala 2.12 in Kafka 4.0
(#17313)
0e4eebe9c00 is described below
commit 0e4eebe9c000e9eefad6461fb502f59615d7318b
Author: TengYao Chi <[email protected]>
AuthorDate: Mon Oct 7 01:34:38 2024 +0800
KAFKA-12895 Drop support for Scala 2.12 in Kafka 4.0 (#17313)
Reviewers: Chia-Ping Tsai <[email protected]>
---
LICENSE-binary | 7 ---
README.md | 51 +-------------------
build.gradle | 31 ++----------
.../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 3 +-
.../controller/ControllerChannelManager.scala | 13 +++--
.../scala/kafka/controller/ControllerContext.scala | 3 +-
.../scala/kafka/controller/KafkaController.scala | 17 ++++---
.../kafka/controller/PartitionStateMachine.scala | 3 +-
.../kafka/controller/ReplicaStateMachine.scala | 5 +-
.../group/GroupCoordinatorAdapter.scala | 11 ++---
.../kafka/coordinator/group/GroupMetadata.scala | 9 ++--
.../coordinator/group/GroupMetadataManager.scala | 13 +++--
.../TransactionMarkerChannelManager.scala | 3 +-
.../transaction/TransactionStateManager.scala | 7 ++-
core/src/main/scala/kafka/log/LogManager.scala | 3 +-
.../main/scala/kafka/network/RequestChannel.scala | 3 +-
.../kafka/security/authorizer/AclAuthorizer.scala | 13 +++--
.../kafka/server/AbstractFetcherManager.scala | 5 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 7 ++-
.../kafka/server/AddPartitionsToTxnManager.scala | 3 +-
core/src/main/scala/kafka/server/AuthHelper.scala | 3 +-
.../main/scala/kafka/server/ConfigHandler.scala | 3 +-
.../scala/kafka/server/DelayedDeleteRecords.scala | 7 ++-
.../main/scala/kafka/server/DelayedProduce.scala | 7 ++-
.../kafka/server/DelayedRemoteListOffsets.scala | 7 ++-
.../scala/kafka/server/DynamicBrokerConfig.scala | 13 +++--
core/src/main/scala/kafka/server/KafkaApis.scala | 15 +++---
.../scala/kafka/server/RemoteLeaderEndPoint.scala | 5 +-
.../main/scala/kafka/server/ReplicaManager.scala | 27 +++++------
.../main/scala/kafka/server/ZkAdminManager.scala | 5 +-
.../kafka/server/metadata/ZkMetadataCache.scala | 5 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 5 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 8 ----
core/src/main/scala/kafka/utils/Implicits.scala | 19 --------
.../main/scala/kafka/utils/json/DecodeJson.scala | 4 +-
core/src/main/scala/kafka/zk/AdminZkClient.scala | 3 +-
.../test/java/kafka/admin/ConfigCommandTest.java | 8 ++--
.../integration/kafka/api/BaseConsumerTest.scala | 42 ++++++++--------
.../kafka/api/PlaintextConsumerAssignTest.scala | 8 ++--
.../kafka/server/IntegrationTestUtils.scala | 3 +-
.../unit/kafka/network/ConnectionQuotasTest.scala | 9 ++--
.../kafka/server/AbstractFetcherManagerTest.scala | 3 +-
.../server/AddPartitionsToTxnManagerTest.scala | 3 +-
.../unit/kafka/server/MockLeaderEndPoint.scala | 3 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 1 -
.../server/epoch/LeaderEpochIntegrationTest.scala | 3 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
docs/upgrade.html | 4 ++
gradle/dependencies.gradle | 21 ++------
gradle/spotbugs-exclude.xml | 56 ----------------------
gradlewAll | 6 ++-
.../kafka/jmh/util/ConcurrentMapBenchmark.java | 19 ++++++++
release/release.py | 4 +-
.../kafka/common/test/KafkaClusterTestKit.java | 1 -
55 files changed, 175 insertions(+), 369 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 611909cba67..d067f3ffb52 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -226,7 +226,6 @@ jackson-jaxrs-json-provider-2.16.2
jackson-module-afterburner-2.16.2
jackson-module-jaxb-annotations-2.16.2
jackson-module-scala_2.13-2.16.2
-jackson-module-scala_2.12-2.16.2
jakarta.validation-api-2.0.2
javassist-3.29.2-GA
jetty-client-9.4.54.v20240208
@@ -257,15 +256,9 @@ opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
reload4j-1.2.25
rocksdbjni-7.9.2
-scala-collection-compat_2.12-2.10.0
-scala-collection-compat_2.13-2.10.0
-scala-library-2.12.19
scala-library-2.13.15
-scala-logging_2.12-3.9.5
scala-logging_2.13-3.9.5
-scala-reflect-2.12.19
scala-reflect-2.13.15
-scala-java8-compat_2.12-1.0.2
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.5
swagger-annotations-2.2.8
diff --git a/README.md b/README.md
index 6e90cac5c52..0be7bff7afa 100644
--- a/README.md
+++ b/README.md
@@ -11,9 +11,7 @@ the broker and tools has been deprecated since Apache Kafka
3.7 and removal of b
see
[KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223)
and
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510)
for more details).
-Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12
support has been deprecated since
-Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see
[KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
-for more details). See below for how to use a specific Scala version or all of
the supported Scala versions.
+Scala 2.13 is the only supported version in Apache Kafka.
### Build a jar and run it ###
./gradlew jar
@@ -122,23 +120,6 @@ Using compiled files:
### Cleaning the build ###
./gradlew clean
-### Running a task with one of the Scala versions available (2.12.x or 2.13.x)
###
-*Note that if building the jars with a version other than 2.13.x, you need to
set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to
run the quick start.*
-
-You can pass either the major version (eg 2.12) or the full version (eg
2.12.7):
-
- ./gradlew -PscalaVersion=2.12 jar
- ./gradlew -PscalaVersion=2.12 test
- ./gradlew -PscalaVersion=2.12 releaseTarGz
-
-### Running a task with all the scala versions enabled by default ###
-
-Invoke the `gradlewAll` script followed by the task(s):
-
- ./gradlewAll test
- ./gradlewAll jar
- ./gradlewAll releaseTarGz
-
### Running a task for a specific project ###
This is for `core`, `examples` and `clients`
@@ -162,24 +143,6 @@ The `eclipse` task has been configured to use
`${project_dir}/build_eclipse` as
build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory
and we don't use Gradle's build directory
to avoid known issues with this configuration.
-### Publishing the jar for all versions of Scala and for all projects to maven
###
-The recommended command is:
-
- ./gradlewAll publish
-
-For backwards compatibility, the following also works:
-
- ./gradlewAll uploadArchives
-
-Please note for this to work you should create/update
`${GRADLE_USER_HOME}/gradle.properties` (typically,
`~/.gradle/gradle.properties`) and assign the following variables
-
- mavenUrl=
- mavenUsername=
- mavenPassword=
- signing.keyId=
- signing.password=
- signing.secretKeyRingFile=
-
### Publishing the streams quickstart archetype artifact to maven ###
For the Streams archetype project, one cannot use gradle to upload to maven;
instead the `mvn deploy` command needs to be called at the quickstart folder:
@@ -209,22 +172,10 @@ Please note for this to work you should create/update
user maven settings (typic
</servers>
...
-
-### Installing ALL the jars to the local Maven repository ###
-The recommended command to build for both Scala 2.12 and 2.13 is:
-
- ./gradlewAll publishToMavenLocal
-
-For backwards compatibility, the following also works:
-
- ./gradlewAll install
-
### Installing specific projects to the local Maven repository ###
./gradlew -PskipSigning=true :streams:publishToMavenLocal
-If needed, you can specify the Scala version with `-PscalaVersion=2.13`.
-
### Building the test jar ###
./gradlew testJar
diff --git a/build.gradle b/build.gradle
index ec55a370a7b..242143794e4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -773,25 +773,11 @@ subprojects {
scalaCompileOptions.additionalParameters += inlineFrom
}
- if (versions.baseScala != '2.12') {
- scalaCompileOptions.additionalParameters += ["-opt-warnings",
"-Xlint:strict-unsealed-patmat"]
- // Scala 2.13.2 introduces compiler warnings suppression, which is a
pre-requisite for -Xfatal-warnings
- scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
- }
-
- // these options are valid for Scala versions < 2.13 only
- // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502
and https://github.com/scala/scala/pull/5969
- if (versions.baseScala == '2.12') {
- scalaCompileOptions.additionalParameters += [
- "-Xlint:by-name-right-associative",
- "-Xlint:nullary-override",
- "-Xlint:unsound-match"
- ]
- }
+ scalaCompileOptions.additionalParameters += ["-opt-warnings",
"-Xlint:strict-unsealed-patmat"]
+ // Scala 2.13.2 introduces compiler warnings suppression, which is a
pre-requisite for -Xfatal-warnings
+ scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
- // Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13
doesn't have that restriction
- if (versions.baseScala == "2.13" ||
JavaVersion.current().isJava9Compatible())
- scalaCompileOptions.additionalParameters += ["-release",
String.valueOf(minJavaVersion)]
+ scalaCompileOptions.additionalParameters += ["-release",
String.valueOf(minJavaVersion)]
addParametersForTests(name, options)
@@ -1096,7 +1082,6 @@ project(':core') {
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics
- implementation libs.scalaCollectionCompat
implementation libs.scalaJava8Compat
// only needed transitively, but set it explicitly to ensure it has the
same version as scala-library
implementation libs.scalaReflect
@@ -2813,14 +2798,6 @@ project(':streams:streams-scala') {
api project(':streams')
api libs.scalaLibrary
- if ( versions.baseScala == '2.12' ) {
- // Scala-Collection-Compat isn't required when compiling with Scala 2.13
or later,
- // and having it in the dependencies could lead to classpath conflicts
in Scala 3
- // projects that use kafka-streams-kafka_2.13 (because we don't have a
Scala 3 version yet)
- // but also pull in scala-collection-compat_3 via another dependency.
- // So we make sure to not include it in the dependencies.
- api libs.scalaCollectionCompat
- }
testImplementation project(':group-coordinator')
testImplementation project(':core')
testImplementation project(':test-common')
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 6602d879759..4722311f6c0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -628,7 +628,7 @@ object ConfigCommand extends Logging {
private def describeQuotaConfigs(adminClient: Admin, entityTypes:
List[String], entityNames: List[String]): Unit = {
val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes,
entityNames)
- quotaConfigs.forKeyValue { (entity, entries) =>
+ quotaConfigs.foreachEntry { (entity, entries) =>
val entityEntries = entity.entries.asScala
def entitySubstr(entityType: String): Option[String] =
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 33fec627486..77662c3b114 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -19,7 +19,6 @@ package kafka.admin
import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
import kafka.utils.{Logging, ToolsUtils}
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData,
ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
@@ -130,7 +129,7 @@ object ZkSecurityMigrator extends Logging {
// Now override any set system properties with explicitly-provided values
from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes --
help people see mistakes they make
info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration
properties in file $filename")
- zkTlsConfigFileProps.asScala.forKeyValue { (key, value) =>
+ zkTlsConfigFileProps.asScala.foreachEntry { (key, value) =>
info(s"Setting $key")
KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
}
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 27ca3c5e02e..cea7368378d 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.{Gauge, Timer}
import kafka.cluster.Broker
import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.clients._
import org.apache.kafka.common._
@@ -524,11 +523,11 @@ abstract class
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1
else 0
- leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates)
=>
+ leaderAndIsrRequestMap.foreachEntry { (broker,
leaderAndIsrPartitionStates) =>
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) {
val leaderIds = mutable.Set.empty[Int]
var numBecomeLeaders = 0
- leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) =>
+ leaderAndIsrPartitionStates.foreachEntry { (topicPartition, state) =>
leaderIds += state.leader
val typeOfRequest = if (broker == state.leader) {
numBecomeLeaders += 1
@@ -669,10 +668,10 @@ abstract class
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
handleStopReplicaResponse(stopReplicaResponse, brokerId,
partitionErrorsForDeletingTopics.toMap)
}
- stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) =>
+ stopReplicaRequestMap.foreachEntry { (brokerId, partitionStates) =>
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) {
if (traceEnabled)
- partitionStates.forKeyValue { (topicPartition, partitionState) =>
+ partitionStates.foreachEntry { (topicPartition, partitionState) =>
stateChangeLog.trace(s"Sending StopReplica request $partitionState
to " +
s"broker $brokerId for partition $topicPartition")
}
@@ -680,7 +679,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId)
if (stopReplicaRequestVersion >= 3) {
val stopReplicaTopicState = mutable.Map.empty[String,
StopReplicaTopicState]
- partitionStates.forKeyValue { (topicPartition, partitionState) =>
+ partitionStates.foreachEntry { (topicPartition, partitionState) =>
val topicState =
stopReplicaTopicState.getOrElseUpdate(topicPartition.topic,
new StopReplicaTopicState().setTopicName(topicPartition.topic))
topicState.partitionStates().add(partitionState)
@@ -699,7 +698,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
val topicStatesWithDelete = mutable.Map.empty[String,
StopReplicaTopicState]
val topicStatesWithoutDelete = mutable.Map.empty[String,
StopReplicaTopicState]
- partitionStates.forKeyValue { (topicPartition, partitionState) =>
+ partitionStates.foreachEntry { (topicPartition, partitionState) =>
val topicStates = if (partitionState.deletePartition()) {
numPartitionStateWithDelete += 1
topicStatesWithDelete
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 2df4e0f78f0..10420716416 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -18,7 +18,6 @@
package kafka.controller
import kafka.cluster.Broker
-import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr
@@ -522,7 +521,7 @@ class ControllerContext extends ControllerChannelContext {
}
private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
- partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue {
(partition, replicaAssignment) =>
+ partitionAssignments.getOrElse(topic, mutable.Map.empty).foreachEntry {
(partition, replicaAssignment) =>
partitionLeadershipInfo.get(new TopicPartition(topic,
partition)).foreach { leadershipInfo =>
if (!hasPreferredLeader(replicaAssignment, leadershipInfo))
preferredReplicaImbalanceCount -= 1
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index bd4df02ca3b..0f9d65ebb1c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -26,7 +26,6 @@ import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
import kafka.utils._
-import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zk.{FeatureZNodeStatus, _}
@@ -1030,7 +1029,7 @@ class KafkaController(val config: KafkaConfig,
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] =
controllerContext.allPartitions.toSeq): Unit = {
val leaderIsrAndControllerEpochs =
zkClient.getTopicPartitionStates(partitions)
- leaderIsrAndControllerEpochs.forKeyValue { (partition,
leaderIsrAndControllerEpoch) =>
+ leaderIsrAndControllerEpochs.foreachEntry { (partition,
leaderIsrAndControllerEpoch) =>
controllerContext.putPartitionLeadershipInfo(partition,
leaderIsrAndControllerEpoch)
}
}
@@ -1297,7 +1296,7 @@ class KafkaController(val config: KafkaConfig,
}.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
// for each broker, check if a preferred replica election needs to be
triggered
- preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker,
topicPartitionsForBroker) =>
+ preferredReplicasForTopicsByBrokers.foreachEntry { (leaderBroker,
topicPartitionsForBroker) =>
val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case
(topicPartition, _) =>
val leadershipInfo =
controllerContext.partitionLeadershipInfo(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
@@ -1776,7 +1775,7 @@ class KafkaController(val config: KafkaConfig,
}
} else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
- partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
+ partitionsToBeAdded.foreachEntry { (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition,
assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
@@ -1821,7 +1820,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition,
ReplicaAssignment]
- zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) =>
+ zkClient.getPartitionReassignment.foreachEntry { (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new
ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
@@ -1858,7 +1857,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition,
ReplicaAssignment]
- reassignments.forKeyValue { (tp, targetReplicas) =>
+ reassignments.foreachEntry { (tp, targetReplicas) =>
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
maybeApiError match {
case None =>
@@ -2304,7 +2303,7 @@ class KafkaController(val config: KafkaConfig,
// After we have returned the result of the `AlterPartition` request, we
should check whether
// there are any reassignments which can be completed by a successful ISR
expansion.
- partitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
+ partitionResponses.foreachEntry { (topicPartition, partitionResponse) =>
if
(controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
val isSuccessfulUpdate = partitionResponse.isRight
if (isSuccessfulUpdate) {
@@ -2480,7 +2479,7 @@ class KafkaController(val config: KafkaConfig,
partitionsToAlter.keySet
)
- partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName,
partitionResponses) =>
+ partitionResponses.groupBy(_._1.topic).foreachEntry { (topicName,
partitionResponses) =>
// Add each topic part to the response
val topicResponse = if (useTopicsIds) {
new AlterPartitionResponseData.TopicData()
@@ -2491,7 +2490,7 @@ class KafkaController(val config: KafkaConfig,
}
alterPartitionResponse.topics.add(topicResponse)
- partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+ partitionResponses.foreachEntry { (tp, errorOrIsr) =>
// Add each partition part to the response (new ISR or error)
errorOrIsr match {
case Left(error) =>
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 2af5381d9b1..c0b92b9c638 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -19,7 +19,6 @@ package kafka.controller
import kafka.common.StateChangeFailedException
import kafka.controller.Election._
import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@@ -437,7 +436,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result =>
result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) =
zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
- finishedUpdates.forKeyValue { (partition, result) =>
+ finishedUpdates.foreachEntry { (partition, result) =>
result.foreach { leaderAndIsr =>
val replicaAssignment =
controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5a299a469c0..406fff2b51b 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -18,7 +18,6 @@ package kafka.controller
import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig
-import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@@ -110,7 +109,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
- replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) =>
+ replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) =>
doHandleStateChanges(replicaId, replicas, targetState)
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@@ -227,7 +226,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
}
val updatedLeaderIsrAndControllerEpochs =
removeReplicasFromIsr(replicaId,
replicasWithLeadershipInfo.map(_.topicPartition))
- updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition,
leaderIsrAndControllerEpoch) =>
+ updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition,
leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to
$leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as
part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients =
controllerContext.partitionReplicaAssignment(partition).filterNot(_ ==
replicaId)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 01756550569..fcf5766b577 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager}
-import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData,
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData,
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData,
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
import org.apache.kafka.common.metrics.Metrics
@@ -281,7 +280,7 @@ private[group] class GroupCoordinatorAdapter(
coordinator.handleDeleteGroups(
groupIds.asScala.toSet,
new RequestLocal(bufferSupplier)
- ).forKeyValue { (groupId, error) =>
+ ).foreachEntry { (groupId, error) =>
results.add(new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId)
.setErrorCode(error.code))
@@ -338,7 +337,7 @@ private[group] class GroupCoordinatorAdapter(
val topicsList = new
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
val topicsMap = new mutable.HashMap[String,
OffsetFetchResponseData.OffsetFetchResponseTopics]()
- results.forKeyValue { (tp, offset) =>
+ results.foreachEntry { (tp, offset) =>
val topic = topicsMap.get(tp.topic) match {
case Some(topic) =>
topic
@@ -378,7 +377,7 @@ private[group] class GroupCoordinatorAdapter(
val response = new OffsetCommitResponseData()
val byTopics = new mutable.HashMap[String,
OffsetCommitResponseData.OffsetCommitResponseTopic]()
- commitStatus.forKeyValue { (tp, error) =>
+ commitStatus.foreachEntry { (tp, error) =>
val topic = byTopics.get(tp.topic) match {
case Some(existingTopic) =>
existingTopic
@@ -445,7 +444,7 @@ private[group] class GroupCoordinatorAdapter(
val response = new TxnOffsetCommitResponseData()
val byTopics = new mutable.HashMap[String,
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]()
- results.forKeyValue { (tp, error) =>
+ results.foreachEntry { (tp, error) =>
val topic = byTopics.get(tp.topic) match {
case Some(existingTopic) =>
existingTopic
@@ -546,7 +545,7 @@ private[group] class GroupCoordinatorAdapter(
future.completeExceptionally(groupError.exception)
} else {
val response = new OffsetDeleteResponseData()
- topicPartitionResults.forKeyValue { (topicPartition, error) =>
+ topicPartitionResults.foreachEntry { (topicPartition, error) =>
var topic = response.topics.find(topicPartition.topic)
if (topic == null) {
topic = new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 5bf69e9aa32..7d74512b712 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
-import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
@@ -651,7 +650,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]):
Unit = {
receivedConsumerOffsetCommits = true
- offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+ offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
pendingOffsetCommits += topicIdPartition.topicPartition ->
offsetAndMetadata
}
}
@@ -662,7 +661,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
val producerOffsets =
pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
- offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+ offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
producerOffsets.put(topicIdPartition.topicPartition,
CommitRecordMetadataAndOffset(None, offsetAndMetadata))
}
}
@@ -708,7 +707,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
val pendingOffsetsOpt =
pendingTransactionalOffsetCommits.remove(producerId)
if (isCommit) {
pendingOffsetsOpt.foreach { pendingOffsets =>
- pendingOffsets.forKeyValue { (topicPartition,
commitRecordMetadataAndOffset) =>
+ pendingOffsets.foreachEntry { (topicPartition,
commitRecordMetadataAndOffset) =>
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
throw new IllegalStateException(s"Trying to complete a
transactional offset commit for producerId $producerId " +
s"and groupId $groupId even though the offset commit record
itself hasn't been appended to the log.")
@@ -746,7 +745,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
def removeOffsets(topicPartitions: Seq[TopicPartition]):
immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition =>
pendingOffsetCommits.remove(topicPartition)
- pendingTransactionalOffsetCommits.forKeyValue { (_, pendingOffsets) =>
+ pendingTransactionalOffsetCommits.foreachEntry { (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition)
}
val removedOffset = offsets.remove(topicPartition)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 21bbae6487b..8db98654b94 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -30,7 +30,6 @@ import kafka.common.OffsetAndMetadata
import
kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
-import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.compress.Compression
@@ -393,7 +392,7 @@ class GroupMetadataManager(brokerId: Int,
val responseError = group.inLock {
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
- filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
+ filteredOffsetMetadata.foreachEntry { (topicIdPartition,
offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.onTxnOffsetCommitAppend(producerId, topicIdPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
else
@@ -409,7 +408,7 @@ class GroupMetadataManager(brokerId: Int,
if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId)
- filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
+ filteredOffsetMetadata.foreachEntry { (topicIdPartition,
offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
else
@@ -705,11 +704,11 @@ class GroupMetadataManager(brokerId: Int,
}.partition { case (group, _) => loadedGroups.contains(group) }
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
- pendingOffsets.forKeyValue { (producerId, producerOffsets) =>
+ pendingOffsets.foreachEntry { (producerId, producerOffsets) =>
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
producerOffsets
.groupBy(_._1.group)
- .forKeyValue { (group, offsets) =>
+ .foreachEntry { (group, offsets) =>
val groupPendingOffsets =
pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
val groupProducerOffsets =
groupPendingOffsets.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
groupProducerOffsets ++= offsets.map { case
(groupTopicPartition, offset) =>
@@ -878,7 +877,7 @@ class GroupMetadataManager(brokerId: Int,
replicaManager.onlinePartition(appendPartition).foreach { partition
=>
val tombstones = ArrayBuffer.empty[SimpleRecord]
- removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
+ removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata)
=>
trace(s"Removing expired/deleted offset and metadata for
$groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId,
topicPartition)
tombstones += new SimpleRecord(timestamp, commitKey, null)
@@ -971,7 +970,7 @@ class GroupMetadataManager(brokerId: Int,
}
private def removeGroupFromAllProducers(groupId: String): Unit =
openGroupsForProducer synchronized {
- openGroupsForProducer.forKeyValue { (_, groups) =>
+ openGroupsForProducer.foreachEntry { (_, groups) =>
groups.remove(groupId)
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 410d7f044ee..afd645877d1 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -22,7 +22,6 @@ import
kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR
import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap,
LinkedBlockingQueue}
import kafka.server.{KafkaConfig, MetadataCache}
-import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
@@ -147,7 +146,7 @@ class TxnMarkerQueue(@volatile var destination: Node)
extends Logging {
}
def forEachTxnTopicPartition[B](f:(Int,
BlockingQueue[PendingCompleteTxnAndMarkerEntry]) => B): Unit =
- markersPerTxnTopicPartition.forKeyValue { (partition, queue) =>
+ markersPerTxnTopicPartition.foreachEntry { (partition, queue) =>
if (!queue.isEmpty) f(partition, queue)
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 094c4d950cc..5abd9768767 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{MetadataCache, ReplicaManager}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool}
-import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ListTransactionsResponseData
@@ -237,7 +236,7 @@ class TransactionStateManager(brokerId: Int,
private[transaction] def removeExpiredTransactionalIds(): Unit = {
inReadLock(stateLock) {
- transactionMetadataCache.forKeyValue { (partitionId,
partitionCacheEntry) =>
+ transactionMetadataCache.foreachEntry { (partitionId,
partitionCacheEntry) =>
val transactionPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
removeExpiredTransactionalIds(transactionPartition,
partitionCacheEntry)
}
@@ -250,7 +249,7 @@ class TransactionStateManager(brokerId: Int,
tombstoneRecords: MemoryRecords
): Unit = {
def removeFromCacheCallback(responses: collection.Map[TopicPartition,
PartitionResponse]): Unit = {
- responses.forKeyValue { (topicPartition, response) =>
+ responses.foreachEntry { (topicPartition, response) =>
inReadLock(stateLock) {
transactionMetadataCache.get(topicPartition.partition).foreach {
txnMetadataCacheEntry =>
expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
@@ -345,7 +344,7 @@ class TransactionStateManager(brokerId: Int,
}
val states = new
java.util.ArrayList[ListTransactionsResponseData.TransactionState]
- transactionMetadataCache.forKeyValue { (_, cache) =>
+ transactionMetadataCache.foreachEntry { (_, cache) =>
cache.metadataPerTransactionalId.values.foreach { txnMetadata =>
txnMetadata.inLock {
if (shouldInclude(txnMetadata)) {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 176e510c6d0..a0e68364138 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -35,7 +35,6 @@ import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
-import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.requests.{AbstractControlRequest,
LeaderAndIsrRequest}
import org.apache.kafka.image.TopicsImage
@@ -699,7 +698,7 @@ class LogManager(logDirs: Seq[File],
}
try {
- jobs.forKeyValue { (dir, dirJobs) =>
+ jobs.foreachEntry { (dir, dirJobs) =>
if (waitForAllToComplete(dirJobs,
e => warn(s"There was an error in one of the threads during
LogManager shutdown: ${e.getCause}"))) {
val logs = logsInDir(localLogsByDir, dir)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index ca4ab90957b..fc215adb3aa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,6 @@ import com.typesafe.scalalogging.Logger
import kafka.network
import kafka.server.KafkaConfig
import kafka.utils.Logging
-import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.EnvelopeResponseData
@@ -465,7 +464,7 @@ class RequestChannel(val queueSize: Int,
requestQueue.take()
def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors,
Integer]): Unit = {
- errors.forKeyValue { (error, count) =>
+ errors.foreachEntry { (error, count) =>
metrics(apiKey.name).markErrorMeter(error, count)
}
}
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index dd5df2f2da4..501c6e2ece3 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture,
CompletionStage}
import com.typesafe.scalalogging.Logger
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._
-import kafka.utils.Implicits._
import kafka.zk._
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._
@@ -107,7 +106,7 @@ object AclAuthorizer {
// be sure to force creation since the zkSslClientEnable property in the
kafkaConfig could be false
val zkClientConfig =
KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable =
true)
// add in any prefixed overlays
- ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue {
(kafkaProp, sysProp) =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.foreachEntry {
(kafkaProp, sysProp) =>
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach {
prefixedValue =>
zkClientConfig.setProperty(sysProp,
if (kafkaProp ==
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
@@ -187,7 +186,7 @@ class AclAuthorizer extends Authorizer with Logging {
override def configure(javaConfigs: util.Map[String, _]): Unit = {
val configs = javaConfigs.asScala
val props = new java.util.Properties()
- configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }
+ configs.foreachEntry { (key, value) => props.put(key, value.toString.trim)
}
superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty => str.split(";").map(s =>
SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
@@ -251,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
if (aclsToCreate.nonEmpty) {
lock synchronized {
- aclsToCreate.forKeyValue { (resource, aclsWithIndex) =>
+ aclsToCreate.foreachEntry { (resource, aclsWithIndex) =>
try {
updateResourceAcls(resource) { currentAcls =>
val newAcls = aclsWithIndex.map { case (acl, _) => new
AclEntry(acl.entry) }
@@ -299,7 +298,7 @@ class AclAuthorizer extends Authorizer with Logging {
resource -> matchingFilters
}.toMap.filter(_._2.nonEmpty)
- resourcesToUpdate.forKeyValue { (resource, matchingFilters) =>
+ resourcesToUpdate.foreachEntry { (resource, matchingFilters) =>
val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding,
Int]()
try {
updateResourceAcls(resource) { currentAcls =>
@@ -334,7 +333,7 @@ class AclAuthorizer extends Authorizer with Logging {
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]()
- aclCache.forKeyValue { case (resource, versionedAcls) =>
+ aclCache.foreachEntry { case (resource, versionedAcls) =>
versionedAcls.acls.foreach { acl =>
val binding = new AclBinding(resource, acl.ace)
if (filter.matches(binding))
@@ -552,7 +551,7 @@ class AclAuthorizer extends Authorizer with Logging {
aclCacheSnapshot
.from(new ResourcePattern(resourceType, resourceName,
PatternType.PREFIXED))
.to(new ResourcePattern(resourceType, resourceName.take(1),
PatternType.PREFIXED))
- .forKeyValue { (resource, acls) =>
+ .foreachEntry { (resource, acls) =>
if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls
}
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index eced213734b..c584c987b01 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.BrokerEndPoint
-import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.Utils
@@ -66,11 +65,11 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = {
val allRemovedPartitionsMap = mutable.Map[TopicPartition,
InitialFetchState]()
- fetcherThreadMap.forKeyValue { (id, thread) =>
+ fetcherThreadMap.foreachEntry { (id, thread) =>
val partitionStates = thread.removeAllPartitions()
if (id.fetcherId >= newSize)
thread.shutdown()
- partitionStates.forKeyValue { (topicPartition, currentFetchState) =>
+ partitionStates.foreachEntry { (topicPartition, currentFetchState) =>
val initialFetchState =
InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(),
currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
initOffset = currentFetchState.fetchOffset)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 119f9d01bae..73c89a94399 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -20,7 +20,6 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock
-import kafka.utils.Implicits._
import kafka.utils.{DelayedItem, Logging, Pool}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.PartitionStates
@@ -256,7 +255,7 @@ abstract class AbstractFetcherThread(name: String,
val fetchOffsets = mutable.HashMap.empty[TopicPartition,
OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]
- fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
+ fetchedEpochs.foreachEntry { (tp, leaderEpochOffset) =>
if (partitionStates.contains(tp)) {
Errors.forCode(leaderEpochOffset.errorCode) match {
case Errors.NONE =>
@@ -329,7 +328,7 @@ abstract class AbstractFetcherThread(name: String,
if (responseData.nonEmpty) {
// process fetched data
inLock(partitionMapLock) {
- responseData.forKeyValue { (topicPartition, partitionData) =>
+ responseData.foreachEntry { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach {
currentFetchState =>
// It's possible that a partition is removed and re-added or
truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the
partition state is ready for fetch and
@@ -508,7 +507,7 @@ abstract class AbstractFetcherThread(name: String,
try {
failedPartitions.removeAll(initialFetchStates.keySet)
- initialFetchStates.forKeyValue { (tp, initialFetchState) =>
+ initialFetchStates.foreachEntry { (tp, initialFetchState) =>
val currentState = partitionStates.stateValue(tp)
val updatedState = partitionFetchState(tp, initialFetchState,
currentState)
partitionStates.updateAndMoveToEnd(tp, updatedState)
diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
index 90e8114583b..0b680e034b2 100644
--- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
+++ b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
@@ -18,7 +18,6 @@
package kafka.server
import
kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName,
VerificationTimeMsMetricName}
-import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NetworkClient,
RequestCompletionHandler}
import org.apache.kafka.common.internals.Topic
@@ -99,7 +98,7 @@ class AddPartitionsToTxnManager(
callback(topicPartitions.map(tp => tp ->
Errors.COORDINATOR_NOT_AVAILABLE).toMap)
} else {
val topicCollection = new AddPartitionsToTxnTopicCollection()
- topicPartitions.groupBy(_.topic).forKeyValue { (topic, tps) =>
+ topicPartitions.groupBy(_.topic).foreachEntry { (topic, tps) =>
topicCollection.add(new AddPartitionsToTxnTopic()
.setName(topic)
.setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava))
diff --git a/core/src/main/scala/kafka/server/AuthHelper.scala
b/core/src/main/scala/kafka/server/AuthHelper.scala
index 6e779ce9007..4d21fb43859 100644
--- a/core/src/main/scala/kafka/server/AuthHelper.scala
+++ b/core/src/main/scala/kafka/server/AuthHelper.scala
@@ -20,7 +20,6 @@ package kafka.server
import java.lang.{Byte => JByte}
import java.util.Collections
import kafka.network.RequestChannel
-import kafka.utils.CoreUtils
import org.apache.kafka.clients.admin.EndpointType
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation.DESCRIBE
@@ -105,7 +104,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
logIfDenied: Boolean = true)(resourceName: T =>
String): Set[String] = {
authorizer match {
case Some(authZ) =>
- val resourceNameToCount =
CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _)
+ val resourceNameToCount = resources.groupMapReduce(resourceName)(_ =>
1)(_ + _)
val actions = resourceNameToCount.iterator.map { case (resourceName,
count) =>
val resource = new ResourcePattern(resourceType, resourceName,
PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 77a3874a3dd..8e8c603f1d2 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -24,7 +24,6 @@ import kafka.log.UnifiedLog
import kafka.network.ConnectionQuotas
import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs,
ZooKeeperInternals}
import org.apache.kafka.common.config.TopicConfig
@@ -64,7 +63,7 @@ class TopicConfigHandler(private val replicaManager:
ReplicaManager,
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)
val props = new Properties()
- topicConfig.asScala.forKeyValue { (key, value) =>
+ topicConfig.asScala.foreachEntry { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 6d25bd1f5c8..0375cc8a072 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,7 +20,6 @@ package kafka.server
import java.util.concurrent.TimeUnit
-import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData
import org.apache.kafka.common.protocol.Errors
@@ -49,7 +48,7 @@ class DelayedDeleteRecords(delayMs: Long,
extends DelayedOperation(delayMs) {
// first update the acks pending variable according to the error code
- deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
+ deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
if (status.responseStatus.errorCode == Errors.NONE.code) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
@@ -70,7 +69,7 @@ class DelayedDeleteRecords(delayMs: Long,
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
- deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
+ deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
trace(s"Checking delete records satisfaction for $topicPartition,
current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
@@ -106,7 +105,7 @@ class DelayedDeleteRecords(delayMs: Long,
}
override def onExpiration(): Unit = {
- deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
+ deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
if (status.acksPending) {
DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
}
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 7a21a86260c..c6470c81358 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
-import kafka.utils.Implicits._
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
@@ -65,7 +64,7 @@ class DelayedProduce(delayMs: Long,
override lazy val logger: Logger = DelayedProduce.logger
// first update the acks pending variable according to the error code
- produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
+ produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
if (status.responseStatus.error == Errors.NONE) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
@@ -90,7 +89,7 @@ class DelayedProduce(delayMs: Long,
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
- produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
+ produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current
status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
@@ -119,7 +118,7 @@ class DelayedProduce(delayMs: Long,
}
override def onExpiration(): Unit = {
- produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
+ produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
if (status.acksPending) {
debug(s"Expiring produce request for partition $topicPartition with
status $status")
DelayedProduceMetrics.recordExpiration(topicPartition)
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
index 0da3cc20764..8fca0226a0b 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
@@ -18,7 +18,6 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.log.AsyncOffsetReadFutureHolder
-import kafka.utils.Implicits._
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ApiException
@@ -58,7 +57,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT
by default.
- metadata.statusByPartition.forKeyValue { (topicPartition, status) =>
+ metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty
if (status.futureHolderOpt.isDefined) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT,
topicPartition.partition()))
@@ -70,7 +69,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
* Call-back to execute when a delayed operation gets expired and hence
forced to complete.
*/
override def onExpiration(): Unit = {
- metadata.statusByPartition.forKeyValue { (topicPartition, status) =>
+ metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
if (!status.completed) {
debug(s"Expiring list offset request for partition $topicPartition
with status $status")
status.futureHolderOpt.foreach(futureHolder =>
futureHolder.jobFuture.cancel(true))
@@ -100,7 +99,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
*/
override def tryComplete(): Boolean = {
var completable = true
- metadata.statusByPartition.forKeyValue { (partition, status) =>
+ metadata.statusByPartition.foreachEntry { (partition, status) =>
if (!status.completed) {
status.futureHolderOpt.foreach { futureHolder =>
if (futureHolder.taskFuture.isDone) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ffc338f7d18..f93f4c82349 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
-import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -399,7 +398,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
props.setProperty(configName, passwordEncoder.encode(new
Password(value)))
}
}
- configProps.asScala.forKeyValue { (name, value) =>
+ configProps.asScala.foreachEntry { (name, value) =>
if (isPasswordConfig(name))
encodePassword(name, value)
}
@@ -436,7 +435,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
}
- props.asScala.forKeyValue { (name, value) =>
+ props.asScala.foreachEntry { (name, value) =>
if (isPasswordConfig(name))
decodePassword(name, value)
}
@@ -451,7 +450,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach
{ passwordDecoder =>
- persistentProps.asScala.forKeyValue { (configName, value) =>
+ persistentProps.asScala.foreachEntry { (configName, value) =>
if (isPasswordConfig(configName) && value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
@@ -545,7 +544,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
* `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
*/
private def overrideProps(props: mutable.Map[String, String], propsOverride:
mutable.Map[String, String]): Unit = {
- propsOverride.forKeyValue { (k, v) =>
+ propsOverride.foreachEntry { (k, v) =>
// Remove synonyms of `k` to ensure the right precedence is applied. But
disable `matchListenerOverride`
// so that base configs corresponding to listener configs are not
removed. Base configs should not be removed
// since they may be used by other listeners. It is ok to retain them in
`props` since base configs cannot be
@@ -916,7 +915,7 @@ class DynamicMetricReporterState(brokerId: Int, config:
KafkaConfig, metrics: Me
updatedConfigs: util.Map[String, _]): Unit = {
val props = new util.HashMap[String, AnyRef]
updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
- propsOverride.forKeyValue((k, v) => props.put(k, v))
+ propsOverride.foreachEntry((k, v) => props.put(k, v))
val reporters =
dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses,
classOf[MetricsReporter], props)
// Call notifyMetricsReporters first to satisfy the contract for
MetricsReporter.contextChange,
@@ -1058,7 +1057,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
newAdvertisedListeners: Map[ListenerName, EndPoint]
): Boolean = {
if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
- oldAdvertisedListeners.forKeyValue {
+ oldAdvertisedListeners.foreachEntry {
case (oldListenerName, oldEndpoint) =>
newAdvertisedListeners.get(oldListenerName) match {
case None => return true
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2f508ef6dc7..c2f24381258 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory.{QuotaManagers,
UnboundedQuota}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.server.share.SharePartitionManager
-import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.CommonClientConfigs
@@ -344,7 +343,7 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionStates)
// Clear the coordinator caches in case we were the leader. In the case
of a reassignment, we
// cannot rely on the LeaderAndIsr API for this since it is only sent to
active replicas.
- result.forKeyValue { (topicPartition, error) =>
+ result.foreachEntry { (topicPartition, error) =>
if (error == Errors.NONE) {
val partitionState = partitionStates(topicPartition)
if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
@@ -662,7 +661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
var errorInResponse = false
val nodeEndpoints = new mutable.HashMap[Int, Node]
- mergedResponseStatus.forKeyValue { (topicPartition, status) =>
+ mergedResponseStatus.foreachEntry { (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on
partition %s failed due to %s".format(
@@ -732,7 +731,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
- processingStats.forKeyValue { (tp, info) =>
+ processingStats.foreachEntry { (tp, info) =>
updateRecordConversionStats(request, tp, info)
}
}
@@ -2208,7 +2207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a DeleteRecordsResponse
def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition,
DeleteRecordsPartitionResult]): Unit = {
val mergedResponseStatus = authorizedTopicResponses ++
unauthorizedTopicResponses ++ nonExistingTopicResponses
- mergedResponseStatus.forKeyValue { (topicPartition, status) =>
+ mergedResponseStatus.foreachEntry { (topicPartition, status) =>
if (status.errorCode != Errors.NONE.code) {
debug("DeleteRecordsRequest with correlation id %d from client %s on
partition %s failed due to %s".format(
request.header.correlationId,
@@ -2485,7 +2484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
entriesPerPartition = controlRecords,
requestLocal = requestLocal,
responseCallback = errors => {
- errors.forKeyValue { (tp, partitionResponse) =>
+ errors.foreachEntry { (tp, partitionResponse) =>
markerResults.put(tp, partitionResponse.error)
}
maybeComplete()
@@ -3328,11 +3327,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val electionResults = new util.ArrayList[ReplicaElectionResult]()
adjustedResults
.groupBy { case (tp, _) => tp.topic }
- .forKeyValue { (topic, ps) =>
+ .foreachEntry { (topic, ps) =>
val electionResult = new ReplicaElectionResult()
electionResult.setTopic(topic)
- ps.forKeyValue { (topicPartition, error) =>
+ ps.foreachEntry { (topicPartition, error) =>
val partitionResult = new PartitionResult()
partitionResult.setPartitionId(topicPartition.partition)
partitionResult.setErrorCode(error.error.code)
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index edf514cb77e..3cdff49d408 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -21,7 +21,6 @@ import kafka.cluster.BrokerEndPoint
import java.util.{Collections, Optional}
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
-import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.KafkaStorageException
@@ -141,7 +140,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
}
val topics = new OffsetForLeaderTopicCollection(partitions.size)
- partitions.forKeyValue { (topicPartition, epochData) =>
+ partitions.foreachEntry { (topicPartition, epochData) =>
var topic = topics.find(topicPartition.topic)
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
@@ -182,7 +181,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
val partitionsWithError = mutable.Set[TopicPartition]()
val builder = fetchSessionHandler.newBuilder(partitions.size, false)
- partitions.forKeyValue { (topicPartition, fetchState) =>
+ partitions.foreachEntry { (topicPartition, fetchState) =>
// We will not include a replica in the fetch request if it should be
throttled.
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota,
fetchState, topicPartition)) {
try {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 58873cad312..65da75c8865 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,6 @@ import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName,
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName,
IsrShrinksPerSecMetricName, LeaderCountMetricName,
OfflineReplicaCountMetricName, PartitionCountMetricName,
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName,
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName,
UnderReplicatedPartitionsMetricName, createLogReadResult,
isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache
-import kafka.utils.Implicits._
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
@@ -436,7 +435,7 @@ class ReplicaManager(val config: KafkaConfig,
// First, stop the partitions. This will shutdown the fetchers and other
managers
val partitionsToStop = strayPartitions.map(tp => StopPartition(tp,
deleteLocalLog = false)).toSet
- stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception)
=>
+ stopPartitions(partitionsToStop).foreachEntry { (topicPartition,
exception) =>
error(s"Unable to stop stray partition $topicPartition", exception)
}
@@ -512,7 +511,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Handling StopReplica request correlationId
$correlationId from controller " +
s"$controllerId for ${partitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
- partitionStates.forKeyValue { (topicPartition, partitionState) =>
+ partitionStates.foreachEntry { (topicPartition, partitionState) =>
stateChangeLogger.trace(s"Received StopReplica request
$partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for partition $topicPartition")
@@ -529,7 +528,7 @@ class ReplicaManager(val config: KafkaConfig,
this.controllerEpoch = controllerEpoch
val stoppedPartitions = mutable.Buffer.empty[StopPartition]
- partitionStates.forKeyValue { (topicPartition, partitionState) =>
+ partitionStates.foreachEntry { (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition()
getPartition(topicPartition) match {
@@ -836,7 +835,7 @@ class ReplicaManager(val config: KafkaConfig,
val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
- entriesPerPartition.forKeyValue { (topicPartition, records) =>
+ entriesPerPartition.foreachEntry { (topicPartition, records) =>
// Produce requests (only requests that require verification) should
only have one batch per partition in "batches" but check all just to be safe.
val transactionalBatches = records.batches.asScala.filter(batch =>
batch.hasProducerId && batch.isTransactional)
transactionalBatches.foreach(batch =>
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
@@ -2203,14 +2202,14 @@ class ReplicaManager(val config: KafkaConfig,
val data = new
LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code)
if (leaderAndIsrRequest.version < 5) {
- responseMap.forKeyValue { (tp, error) =>
+ responseMap.foreachEntry { (tp, error) =>
data.partitionErrors.add(new LeaderAndIsrPartitionError()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setErrorCode(error.code))
}
} else {
- responseMap.forKeyValue { (tp, error) =>
+ responseMap.foreachEntry { (tp, error) =>
val topicId = topicIds.get(tp.topic)
var topic = data.topics.find(topicId)
if (topic == null) {
@@ -2335,7 +2334,7 @@ class ReplicaManager(val config: KafkaConfig,
s"controller $controllerId epoch $controllerEpoch as part of the
become-leader transition for " +
s"${partitionStates.size} partitions")
// Update the partition information to be the leader
- partitionStates.forKeyValue { (partition, partitionState) =>
+ partitionStates.foreachEntry { (partition, partitionState) =>
try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints,
topicIds(partitionState.topicName))) {
partitionsToMakeLeaders += partition
@@ -2400,7 +2399,7 @@ class ReplicaManager(val config: KafkaConfig,
highWatermarkCheckpoints: OffsetCheckpoints,
topicIds: String => Option[Uuid]) : Set[Partition]
= {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
- partitionStates.forKeyValue { (partition, partitionState) =>
+ partitionStates.foreachEntry { (partition, partitionState) =>
if (traceLoggingEnabled)
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId
$correlationId from controller $controllerId " +
s"epoch $controllerEpoch starting the become-follower transition for
partition ${partition.topicPartition} with leader " +
@@ -2410,7 +2409,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
- partitionStates.forKeyValue { (partition, partitionState) =>
+ partitionStates.foreachEntry { (partition, partitionState) =>
val newLeaderBrokerId = partitionState.leader
try {
if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
@@ -2868,7 +2867,7 @@ class ReplicaManager(val config: KafkaConfig,
}
.toSet
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
- stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
+ stopPartitions(deletes).foreachEntry { (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Unable to delete replica $topicPartition
because " +
"the local replica for the partition is in an offline log
directory")
@@ -2918,7 +2917,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s)
to " +
"local leaders.")
replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
- localLeaders.forKeyValue { (tp, info) =>
+ localLeaders.foreachEntry { (tp, info) =>
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition,
isNew) =>
try {
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
@@ -2953,7 +2952,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToStartFetching = new mutable.HashMap[TopicPartition,
Partition]
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
val followerTopicSet = new mutable.HashSet[String]
- localFollowers.forKeyValue { (tp, info) =>
+ localFollowers.foreachEntry { (tp, info) =>
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition,
isNew) =>
try {
followerTopicSet.add(tp.topic)
@@ -3006,7 +3005,7 @@ class ReplicaManager(val config: KafkaConfig,
val listenerName = config.interBrokerListenerName.value
val partitionAndOffsets = new mutable.HashMap[TopicPartition,
InitialFetchState]
- partitionsToStartFetching.forKeyValue { (topicPartition, partition) =>
+ partitionsToStartFetching.foreachEntry { (topicPartition, partition) =>
val nodeOpt = partition.leaderReplicaIdOpt
.flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
.flatMap(_.node(listenerName).asScala)
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 748a0776462..238c7304ad4 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs,
toLoggableProps}
import kafka.server.metadata.ZkConfigRepository
import kafka.utils._
-import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
@@ -963,7 +962,7 @@ class ZkAdminManager(val config: KafkaConfig,
}
).toMap
- illegalRequestsByUser.forKeyValue { (user, errorMessage) =>
+ illegalRequestsByUser.foreachEntry { (user, errorMessage) =>
retval.results.add(new AlterUserScramCredentialsResult().setUser(user)
.setErrorCode(if (errorMessage == unknownScramMechanismMsg)
{Errors.UNSUPPORTED_SASL_MECHANISM.code} else
{Errors.UNACCEPTABLE_CREDENTIAL.code})
.setErrorMessage(errorMessage)) }
@@ -1028,7 +1027,7 @@ class ZkAdminManager(val config: KafkaConfig,
}).collect { case (user: String, exception: Exception) => (user,
exception) }.toMap
// report failures
- usersFailedToPrepareProperties.++(usersFailedToPersist).forKeyValue {
(user, exception) =>
+ usersFailedToPrepareProperties.++(usersFailedToPersist).foreachEntry {
(user, exception) =>
val error = Errors.forException(exception)
retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user)
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 7297b87c593..ca41ebda7f3 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -27,7 +27,6 @@ import kafka.controller.StateChangeLogger
import kafka.server.{BrokerFeatures, CachedControllerId,
KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
import kafka.utils.CoreUtils._
import kafka.utils.Logging
-import kafka.utils.Implicits._
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState,
UpdateMetadataTopicState}
@@ -74,7 +73,7 @@ object ZkMetadataCache {
val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(),
state))
val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
- currentMetadata.topicNames.forKeyValue((id, name) => {
+ currentMetadata.topicNames.foreachEntry((id, name) => {
try {
Option(topicIdToNewState.get(id)) match {
case None =>
@@ -560,7 +559,7 @@ class ZkMetadataCache(
} else {
//since kafka may do partial metadata updates, we start by copying the
previous state
val partitionStates = new mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
- metadataSnapshot.partitionStates.forKeyValue { (topic,
oldPartitionStates) =>
+ metadataSnapshot.partitionStates.foreachEntry { (topic,
oldPartitionStates) =>
val copy = new
mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
copy ++= oldPartitionStates
partitionStates(topic) = copy
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 57f84e39825..d4c49251232 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -23,7 +23,6 @@ import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory,
ObjectNode, TextNode}
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
-import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, VerifiableProperties}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment
@@ -90,7 +89,7 @@ object DumpLogSegments {
}
}
- misMatchesForIndexFilesMap.forKeyValue { (fileName, listOfMismatches) =>
+ misMatchesForIndexFilesMap.foreachEntry { (fileName, listOfMismatches) =>
System.err.println(s"Mismatches in :$fileName")
listOfMismatches.foreach { case (indexOffset, logOffset) =>
System.err.println(s" Index offset: $indexOffset, log offset:
$logOffset")
@@ -99,7 +98,7 @@ object DumpLogSegments {
timeIndexDumpErrors.printErrors()
- nonConsecutivePairsForLogFilesMap.forKeyValue { (fileName,
listOfNonConsecutivePairs) =>
+ nonConsecutivePairsForLogFilesMap.foreachEntry { (fileName,
listOfNonConsecutivePairs) =>
System.err.println(s"Non-consecutive offsets in $fileName")
listOfNonConsecutivePairs.foreach { case (first, second) =>
System.err.println(s" $first is followed by $second")
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 710bbddbd50..29c89c29898 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -36,7 +36,6 @@ import org.apache.kafka.network.SocketServerConfigs
import org.slf4j.event.Level
import java.util
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@@ -260,13 +259,6 @@ object CoreUtils {
}
}
- @nowarn("cat=unused") // see below for explanation
- def groupMapReduce[T, K, B](elements: Iterable[T])(key: T => K)(f: T =>
B)(reduce: (B, B) => B): Map[K, B] = {
- // required for Scala 2.12 compatibility, unused in Scala 2.13 and hence
we need to suppress the unused warning
- import scala.collection.compat._
- elements.groupMapReduce(key)(f)(reduce)
- }
-
def replicaToBrokerAssignmentAsScala(map: util.Map[Integer,
util.List[Integer]]): Map[Int, Seq[Int]] = {
map.asScala.map(e => (e._1.asInstanceOf[Int],
e._2.asScala.map(_.asInstanceOf[Int])))
}
diff --git a/core/src/main/scala/kafka/utils/Implicits.scala
b/core/src/main/scala/kafka/utils/Implicits.scala
index fbd22ec6b8c..80335992051 100644
--- a/core/src/main/scala/kafka/utils/Implicits.scala
+++ b/core/src/main/scala/kafka/utils/Implicits.scala
@@ -20,7 +20,6 @@ package kafka.utils
import java.util
import java.util.Properties
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@@ -46,22 +45,4 @@ object Implicits {
(properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava)
}
-
- /**
- * Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and
`foreach` in Scala 2.12
- * (with the help of scala.collection.compat). `foreachEntry` avoids the
tuple allocation and
- * is more efficient.
- *
- * This was not named `foreachEntry` to avoid `unused import` warnings in
Scala 2.13 (the implicit
- * would not be triggered in Scala 2.13 since `Map.foreachEntry` would have
precedence).
- */
- @nowarn("cat=unused-imports")
- implicit class MapExtensionMethods[K, V](private val self:
scala.collection.Map[K, V]) extends AnyVal {
- import scala.collection.compat._
- def forKeyValue[U](f: (K, V) => U): Unit = {
- self.foreachEntry { (k, v) => f(k, v) }
- }
-
- }
-
}
diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala
b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
index 71bfd61cee1..9c7bec0bdd1 100644
--- a/core/src/main/scala/kafka/utils/json/DecodeJson.scala
+++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
@@ -17,10 +17,8 @@
package kafka.utils.json
-import scala.collection.{Map, Seq}
-import scala.collection.compat._
+import scala.collection.{Factory, Map, Seq}
import scala.jdk.CollectionConverters._
-
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
/**
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 15c95b998b6..290802c5d11 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{DynamicConfig, KafkaConfig}
import kafka.utils._
-import kafka.utils.Implicits._
import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
@@ -311,7 +310,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
expectedReplicationFactor: Int,
availableBrokerIds: Set[Int]): Unit = {
- replicaAssignment.forKeyValue { (partitionId, replicas) =>
+ replicaAssignment.foreachEntry { (partitionId, replicas) =>
if (replicas.isEmpty)
throw new InvalidReplicaAssignmentException(
s"Cannot have replication factor of 0 for partition id
$partitionId.")
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 3535374b6e0..5ccfa525a6d 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -79,8 +79,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import scala.collection.JavaConverters;
import scala.collection.Seq;
+import scala.jdk.javaapi.CollectionConverters;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1004,7 +1004,6 @@ public class ConfigCommandTest {
});
}
- @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldAlterTopicConfig(boolean file) {
@@ -1013,7 +1012,7 @@ public class ConfigCommandTest {
addedConfigs.put("delete.retention.ms", "1000000");
addedConfigs.put("min.insync.replicas", "2");
if (file) {
- File f =
kafka.utils.TestUtils.tempPropertiesFile(JavaConverters.mapAsScalaMap(addedConfigs));
+ File f =
kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs));
filePath = f.getPath();
}
@@ -2292,8 +2291,7 @@ public class ConfigCommandTest {
}
}
- @SuppressWarnings({"deprecation"})
private <T> Seq<T> seq(Collection<T> seq) {
- return
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+ return CollectionConverters.asScala(seq).toSeq();
}
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 26ebed6bd86..ad4439faf54 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-import java.util
-import java.util.Properties
+import java.util.{Properties, stream}
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -117,38 +116,35 @@ object BaseConsumerTest {
// * KRaft and the classic group protocol
// * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersAll() :
java.util.stream.Stream[Arguments] = {
- util.Arrays.stream(Array(
- Arguments.of("zk", "classic"),
- Arguments.of("kraft", "classic"),
- Arguments.of("kraft", "consumer")
- ))
+ stream.Stream.of(
+ Arguments.of("zk", "classic"),
+ Arguments.of("kraft", "classic"),
+ Arguments.of("kraft", "consumer")
+ )
}
- // In Scala 2.12, it is necessary to disambiguate the
java.util.stream.Stream.of() method call
- // in the case where there's only a single Arguments in the list. The
following commented-out
- // method works in Scala 2.13, but not 2.12. For this reason, tests which
run against just a
- // single combination are written using @CsvSource rather than the more
elegant @MethodSource.
- // def getTestQuorumAndGroupProtocolParametersZkOnly() :
java.util.stream.Stream[Arguments] = {
- // java.util.stream.Stream.of(
- // Arguments.of("zk", "classic"))
- // }
+ def getTestQuorumAndGroupProtocolParametersZkOnly() :
java.util.stream.Stream[Arguments] = {
+ stream.Stream.of(
+ Arguments.of("zk", "classic")
+ )
+ }
// For tests that only work with the classic group protocol, we want to test
the following combinations:
// * ZooKeeper and the classic group protocol
// * KRaft and the classic group protocol
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() :
java.util.stream.Stream[Arguments] = {
- util.Arrays.stream(Array(
- Arguments.of("zk", "classic"),
- Arguments.of("kraft", "classic")
- ))
+ stream.Stream.of(
+ Arguments.of("zk", "classic"),
+ Arguments.of("kraft", "classic")
+ )
}
// For tests that only work with the consumer group protocol, we want to
test the following combination:
// * KRaft and the consumer group protocol
- def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly():
java.util.stream.Stream[Arguments] = {
- util.Arrays.stream(Array(
- Arguments.of("kraft", "consumer")
- ))
+ def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly():
stream.Stream[Arguments] = {
+ stream.Stream.of(
+ Arguments.of("kraft", "consumer")
+ )
}
val updateProducerCount = new AtomicInteger()
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
index 219ed9c2a1e..b4bc6f60a8e 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala
@@ -25,7 +25,6 @@ import org.apache.kafka.common.PartitionInfo
import java.util.stream.Stream
import scala.jdk.CollectionConverters._
import scala.collection.mutable
-import org.junit.jupiter.params.provider.CsvSource
/**
* Integration tests for the consumer that covers logic related to manual
assignment.
@@ -137,9 +136,7 @@ class PlaintextConsumerAssignTest extends
AbstractConsumerTest {
// partitionsFor not implemented in consumer group protocol and this test
requires ZK also
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @CsvSource(Array(
- "zk, classic"
- ))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersZkOnly"))
def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String,
groupProtocol: String): Unit = {
val numRecords = 10
val producer = createProducer()
@@ -243,4 +240,7 @@ class PlaintextConsumerAssignTest extends
AbstractConsumerTest {
object PlaintextConsumerAssignTest {
def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+
+ def getTestQuorumAndGroupProtocolParametersZkOnly: Stream[Arguments] =
+ BaseConsumerTest.getTestQuorumAndGroupProtocolParametersZkOnly()
}
diff --git
a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 7e357eb6e2b..62733e5a6db 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import java.util.{Collections, Optional, Properties}
import kafka.network.SocketServer
import kafka.security.JaasTestUtils
-import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.protocol.ApiKeys
@@ -120,7 +119,7 @@ object IntegrationTestUtils {
replicaAssignment: Map[Int, Seq[Int]]
): Unit = {
val javaAssignment = new java.util.HashMap[Integer,
java.util.List[Integer]]()
- replicaAssignment.forKeyValue { (partitionId, assignment) =>
+ replicaAssignment.foreachEntry { (partitionId, assignment) =>
javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
}
val newTopic = new NewTopic(topic, javaAssignment)
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 66971c426f7..97a4b5d7019 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -24,7 +24,6 @@ import java.util.{Collections, Properties}
import com.yammer.metrics.core.Meter
import kafka.network.Processor.ListenerMetricTag
import kafka.server.KafkaConfig
-import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.internals.MetricsUtils
@@ -760,7 +759,7 @@ class ConnectionQuotasTest {
"Expected broker-connection-accept-rate metric to exist")
// add listeners and verify connection limits not exceeded
- listeners.forKeyValue { (name, listener) =>
+ listeners.foreachEntry { (name, listener) =>
val listenerName = listener.listenerName
connectionQuotas.addListener(config, listenerName)
connectionQuotas.maxConnectionsPerListener(listenerName).configure(listenerConfig)
@@ -785,14 +784,14 @@ class ConnectionQuotasTest {
}
private def verifyNoBlockedPercentRecordedOnAllListeners(): Unit = {
- blockedPercentMeters.forKeyValue { (name, meter) =>
+ blockedPercentMeters.foreachEntry { (name, meter) =>
assertEquals(0, meter.count(),
s"BlockedPercentMeter metric for $name listener")
}
}
private def verifyNonZeroBlockedPercentAndThrottleTimeOnAllListeners(): Unit
= {
- blockedPercentMeters.forKeyValue { (name, meter) =>
+ blockedPercentMeters.foreachEntry { (name, meter) =>
assertTrue(meter.count() > 0,
s"Expected BlockedPercentMeter metric for $name listener to be
recorded")
}
@@ -808,7 +807,7 @@ class ConnectionQuotasTest {
}
private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit
= {
- blockedPercentMeters.forKeyValue { (name, meter) =>
+ blockedPercentMeters.foreachEntry { (name, meter) =>
name match {
case "REPLICATION" =>
assertEquals(0, meter.count(), s"BlockedPercentMeter metric for
$name listener")
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 45086961342..b3f514f9629 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -19,7 +19,6 @@ package kafka.server
import com.yammer.metrics.core.Gauge
import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
-import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils
import org.apache.kafka.common.message.FetchResponseData.PartitionData
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -255,7 +254,7 @@ class AbstractFetcherManagerTest {
fetcherManager.resizeThreadPool(newFetcherSize)
val ownedPartitions = mutable.Set.empty[TopicPartition]
- fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId,
fetcherThread) =>
+ fetcherManager.fetcherThreadMap.foreachEntry { (brokerIdAndFetcherId,
fetcherThread) =>
val fetcherId = brokerIdAndFetcherId.fetcherId
val brokerId = brokerIdAndFetcherId.brokerId
diff --git
a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
index 3f27059ce8b..20d9b70a1ae 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
@@ -18,7 +18,6 @@
package kafka.server
import com.yammer.metrics.core.{Histogram, Meter}
-import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.errors.{AuthenticationException,
SaslAuthenticationException, UnsupportedVersionException}
@@ -92,7 +91,7 @@ class AddPartitionsToTxnManagerTest {
}
private def setErrors(errors: mutable.Map[TopicPartition,
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
- callbackErrors.forKeyValue(errors.put)
+ callbackErrors.foreachEntry(errors.put)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
index d2d16718e6f..0d63f872e9d 100644
--- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
+++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
@@ -20,7 +20,6 @@ package kafka.server
import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.message.FetchResponseData
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -139,7 +138,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new
BrokerEndPoint(1, ho
override def fetchEpochEndOffsets(partitions: Map[TopicPartition,
EpochData]): Map[TopicPartition, EpochEndOffset] = {
val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
- partitions.forKeyValue { (partition, epochData) =>
+ partitions.foreachEntry { (partition, epochData) =>
assert(partition.partition == epochData.partition,
"Partition must be consistent between TopicPartition and EpochData")
val leaderState = leaderPartitionState(partition)
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 81fc5ca422b..661a638aa88 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -1314,7 +1314,6 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords],
any[Boolean])).thenReturn(appendInfo)
- // In Scala 2.12, the partitionsWithNewHighWatermark buffer is cleared
before the replicaManager mock is verified.
// Capture the argument at the time of invocation.
val completeDelayedFetchRequestsArgument =
mutable.Buffer.empty[TopicPartition]
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb73a53c3d2..0bb1c9661cb 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -19,7 +19,6 @@ package kafka.server.epoch
import kafka.cluster.BrokerEndPoint
import kafka.server.KafkaConfig._
import kafka.server._
-import kafka.utils.Implicits._
import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -311,7 +310,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness
with Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
val topics = new OffsetForLeaderTopicCollection(partitions.size)
- partitions.forKeyValue { (topicPartition, leaderEpoch) =>
+ partitions.foreachEntry { (topicPartition, leaderEpoch) =>
var topic = topics.find(topicPartition.topic)
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 326bb1ed918..15ed5c2c17d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -421,7 +421,7 @@ object TestUtils extends Logging {
topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
} else {
val assignment = new util.HashMap[Integer, util.List[Integer]]()
- replicaAssignment.forKeyValue { case (k, v) =>
+ replicaAssignment.foreachEntry { case (k, v) =>
val replicas = new util.ArrayList[Integer]
v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
assignment.put(k.asInstanceOf[Integer], replicas)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 96fb97fda8f..4c81bf00d46 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -86,6 +86,10 @@
</li>
<li>Other changes:
<ul>
+ <li>
+ Scala 2.12 support has been removed in Apache Kafka 4.0
+ See <a
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218">KIP-751</a>
for more details
+ </li>
<li>The <code>--delete-config</code> option in the
<code>kafka-topics</code> command line tool has been deprecated.
</li>
</ul>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 8bb417a7362..45248cc7e28 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -20,29 +20,22 @@
ext {
versions = [:]
libs = [:]
-
- // Available if -PscalaVersion is used. This is useful when we want to
support a Scala version that has
- // a higher minimum Java requirement than Kafka. This was previously the
case for Scala 2.12 and Java 7.
- availableScalaVersions = [ '2.12', '2.13' ]
}
// Add Scala version
-def defaultScala212Version = '2.12.19'
def defaultScala213Version = '2.13.15'
if (hasProperty('scalaVersion')) {
- if (scalaVersion == '2.12') {
- versions["scala"] = defaultScala212Version
- } else if (scalaVersion == '2.13') {
+ if (scalaVersion == '2.13') {
versions["scala"] = defaultScala213Version
} else {
versions["scala"] = scalaVersion
}
} else {
- versions["scala"] = defaultScala212Version
+ versions["scala"] = defaultScala213Version
}
/* Resolve base Scala version according to these patterns:
- 1. generally available Scala versions (such as: 2.12.y and 2.13.z)
corresponding base versions will be: 2.12 and 2.13 (respectively)
+ 1. generally available Scala versions (such as: 2.13.z) corresponding base
versions will be: 2.13 (respectively)
2. pre-release Scala versions (i.e. milestone/rc, such as: 2.13.0-M5,
2.13.0-RC1, 2.14.0-M1, etc.) will have identical base versions;
rationale: pre-release Scala versions are not binary compatible with each
other and that's the reason why libraries include the full
Scala release string in their name for pre-releases (see dependencies
below with an artifact name suffix '_$versions.baseScala')
@@ -54,13 +47,9 @@ if ( !versions.scala.contains('-') ) {
}
// mockito >= 5.5 is required for Java 21 and mockito 5.x requires at least
Java 11
-// mockito 4.9 is required for Scala 2.12 as a workaround for compiler errors
due to ambiguous reference to `Mockito.spy`
-// since Scala 2.12 support is going away soon, this is simpler than adjusting
the code.
// mockito 4.11 is used with Java 8 and Scala 2.13
String mockitoVersion
-if (scalaVersion == "2.12")
- mockitoVersion = "4.9.0"
-else if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11))
+if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11))
mockitoVersion = "5.10.0"
else
mockitoVersion = "4.11.0"
@@ -146,7 +135,6 @@ versions += [
pcollections: "4.0.1",
reload4j: "1.2.25",
rocksDB: "7.9.2",
- scalaCollectionCompat: "2.10.0",
// When updating the scalafmt version please also update the version field
in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
@@ -246,7 +234,6 @@ libs += [
opentelemetryProto:
"io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j",
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
- scalaCollectionCompat:
"org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",
scalaJava8Compat:
"org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
scalaLogging:
"com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index bf540c4d7aa..e38ed4e21bf 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -184,68 +184,12 @@ For a detailed description of spotbugs bug categories,
see https://spotbugs.read
<Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED"/>
</Match>
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="AdminUtils.scala"/>
- <Package name="kafka.admin"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="ControllerContext.scala"/>
- <Package name="kafka.controller"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
<Match>
<!-- offsets is a lazy val and it confuses spotBugs with its locking
scheme -->
<Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="ReplicaManager.scala"/>
- <Package name="kafka.server"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="LogManager.scala"/>
- <Package name="kafka.log"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="DelayedElectLeader.scala"/>
- <Package name="kafka.server"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="AdminZkClient.scala"/>
- <Package name="kafka.zk"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="AuthHelper.scala"/>
- <Package name="kafka.server"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
- <Match>
- <!-- Uncallable anonymous methods are left behind after inlining by
scalac 2.12, fixed in 2.13 -->
- <Source name="KafkaApis.scala"/>
- <Package name="kafka.server"/>
- <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- </Match>
-
<Match>
<!-- Keeping this class for compatibility. It's deprecated and will be
removed in the next major release -->
<Source name="MessageFormatter.scala"/>
diff --git a/gradlewAll b/gradlewAll
index c09e9b4a485..21d6d7b7525 100755
--- a/gradlewAll
+++ b/gradlewAll
@@ -17,5 +17,9 @@
# Convenient way to invoke a gradle command with all Scala versions supported
# by default
-./gradlew "$@" -PscalaVersion=2.12 && ./gradlew "$@" -PscalaVersion=2.13
+# This script was originally designed to support multiple Scala versions (2.12
and 2.13),
+# but as Scala 2.12 is no longer supported, this script is no longer necessary.
+# We are keeping it for backwards compatibility. It will be removed in a
future release.
+echo "Warning: This script is deprecated and will be removed in a future
release."
+./gradlew "$@" -PscalaVersion=2.13
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
index 776ca9b5816..7d7716cae2e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java
@@ -189,4 +189,23 @@ public class ConcurrentMapBenchmark {
}
}
}
+
+ @Benchmark
+ @OperationsPerInvocation(TIMES)
+ public void testConcurrentHashMapComputeIfAbsentReadOnly(Blackhole
blackhole) {
+ for (int i = 0; i < TIMES; i++) {
+ blackhole.consume(concurrentHashMap.computeIfAbsent(
+ ThreadLocalRandom.current().nextInt(0, mapSize),
+ newValue -> Integer.MAX_VALUE
+ ));
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(TIMES)
+ public void testConcurrentHashMapGetReadOnly(Blackhole blackhole) {
+ for (int i = 0; i < TIMES; i++) {
+
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0,
mapSize)));
+ }
+ }
}
diff --git a/release/release.py b/release/release.py
index 7710789bba8..67fc46c831d 100644
--- a/release/release.py
+++ b/release/release.py
@@ -326,7 +326,7 @@ except Exception as e:
git.targz(rc_tag, f"kafka-{release_version}-src/",
f"{artifacts_dir}/kafka-{release_version}-src.tgz")
-cmd("Building artifacts", "./gradlew clean && ./gradlewAll releaseTarGz",
cwd=kafka_dir, env=jdk8_env, shell=True)
+cmd("Building artifacts", "./gradlew clean && ./gradlew releaseTarGz
-PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True)
cmd("Copying artifacts", f"cp {kafka_dir}/core/build/distributions/*
{artifacts_dir}", shell=True)
cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir,
env=jdk17_env)
cmd("Copying docs", f"cp -R {kafka_dir}/build/docs/javadoc {artifacts_dir}")
@@ -351,7 +351,7 @@ cmd("Zipping artifacts", f"tar -czf {artifact_name}.tar.gz
{artifact_name}", cwd
sftp.upload_artifacts(apache_id, artifacts_dir)
confirm_or_fail("Going to build and upload mvn artifacts based on these
settings:\n" + textfiles.read(global_gradle_props) + '\nOK?')
-cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir,
env=jdk8_env, shell=True)
+cmd("Building and uploading archives", "./gradlew publish
-PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True)
cmd("Building and uploading archives", "mvn deploy -Pgpg-signing",
cwd=os.path.join(kafka_dir, "streams/quickstart"), env=jdk8_env, shell=True)
# TODO: Many of these suggested validation steps could be automated
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index b9cbb9f3a76..08b12ff442a 100644
---
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -77,7 +77,6 @@ import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LIS
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
-@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
public class KafkaClusterTestKit implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(KafkaClusterTestKit.class);