This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83fb40d7435 KAFKA-14895 [1/N] Move AddPartitionsToTxnManager files to 
java (#19879)
83fb40d7435 is described below

commit 83fb40d74350664a051366545f3098950bfd5b67
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Jun 8 00:16:55 2025 +0800

    KAFKA-14895 [1/N] Move AddPartitionsToTxnManager files to java (#19879)
    
    Move AddPartitionsToTxnManager to server module and convert to Java.
    This patch moves AddPartitionsToTxnManager from the core module to the
    server module, with its package updated from `kafka.server` to
    `org.apache.kafka.server.transaction`. Additionally, several
    configuration used by AddPartitionsToTxnManager are moved from
    KafkaConfig.scala to AbstractKafkaConfig.java.
    - brokerId
    - requestTimeoutMs
    - controllerListenerNames
    - interBrokerListenerName
    - interBrokerSecurityProtocol
    - effectiveListenerSecurityProtocolMap
    
    The next PR will move AddPartitionsToTxnManagerTest.scala to java
    
    Reviewers: Justine Olshan <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../group/CoordinatorPartitionWriter.scala         |   3 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |   8 +-
 .../kafka/server/AddPartitionsToTxnManager.scala   | 315 ------------------
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |  98 +-----
 .../server/NodeToControllerChannelManager.scala    |   5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  18 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |   2 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    |   7 +-
 .../kafka/server/QuorumTestHarness.scala           |   2 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |   1 +
 .../server/AddPartitionsToTxnManagerTest.scala     |  67 ++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  30 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  59 ++--
 .../kafka/server/config/AbstractKafkaConfig.java   | 115 +++++++
 .../transaction/AddPartitionsToTxnManager.java     | 357 +++++++++++++++++++++
 .../test/junit/RaftClusterInvocationContext.java   |   2 +-
 19 files changed, 591 insertions(+), 504 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 62c5bc2d6d9..dbbdbb09868 100644
--- 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -17,13 +17,14 @@
 package kafka.coordinator.group
 
 import kafka.cluster.PartitionListener
-import kafka.server.{AddPartitionsToTxnManager, ReplicaManager}
+import kafka.server.ReplicaManager
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
 import org.apache.kafka.server.ActionQueue
 import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
VerificationGuard}
 
 import java.util.concurrent.CompletableFuture
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index d7775d88dbe..9e8ea38f8fd 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -236,11 +236,9 @@ class KafkaRaftManager[T](
   }
 
   private def buildNetworkClient(): (ListenerName, NetworkClient) = {
-    val controllerListenerName = new 
ListenerName(config.controllerListenerNames.head)
-    val controllerSecurityProtocol = 
config.effectiveListenerSecurityProtocolMap.getOrElse(
-      controllerListenerName,
-      SecurityProtocol.forName(controllerListenerName.value())
-    )
+    val controllerListenerName = new 
ListenerName(config.controllerListenerNames.get(0))
+    val controllerSecurityProtocol = 
Option(config.effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+      .getOrElse(SecurityProtocol.forName(controllerListenerName.value()))
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
       controllerSecurityProtocol,
       JaasContext.Type.SERVER,
diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala 
b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
deleted file mode 100644
index b7e3bd36d84..00000000000
--- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import 
kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName, 
VerificationTimeMsMetricName}
-import kafka.utils.Logging
-import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
-import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.{Node, TopicPartition}
-import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
 AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, 
AddPartitionsToTxnTransactionCollection}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse, MetadataResponse}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
-
-import java.util
-import java.util.concurrent.TimeUnit
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-object AddPartitionsToTxnManager {
-  type AppendCallback = Map[TopicPartition, Errors] => Unit
-
-  val VerificationFailureRateMetricName = "VerificationFailureRate"
-  val VerificationTimeMsMetricName = "VerificationTimeMs"
-
-  def produceRequestVersionToTransactionSupportedOperation(version: Short): 
TransactionSupportedOperation = {
-    if (version > 11) {
-      addPartition
-    } else if (version > 10) {
-      genericErrorSupported
-    } else {
-      defaultError
-    }
-  }
-
-  def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: 
Int): TransactionSupportedOperation = {
-    if (version > 4) {
-      addPartition
-    } else if (version > 3) {
-      genericErrorSupported
-    } else {
-      defaultError
-    }
-  }
-}
-
-/**
- * This is an enum which handles the Partition Response based on the Request 
Version and the exact operation
- *    defaultError:          This is the default workflow which maps to cases 
when the Produce Request Version or the Txn_offset_commit request was lower 
than the first version supporting the new Error Class
- *    genericErrorSupported: This maps to the case when the clients are 
updated to handle the TransactionAbortableException
- *    addPartition:          This allows the partition to be added to the 
transactions inflight with the Produce and TxnOffsetCommit requests. Plus the 
behaviors in genericErrorSupported.
- */
-sealed trait TransactionSupportedOperation {
-  val supportsEpochBump = false;
-}
-case object defaultError extends TransactionSupportedOperation
-case object genericErrorSupported extends TransactionSupportedOperation
-case object addPartition extends TransactionSupportedOperation {
-  override val supportsEpochBump = true
-}
-
-/*
- * Data structure to hold the transactional data to send to a node. Note -- at 
most one request per transactional ID
- * will exist at a time in the map. If a given transactional ID exists in the 
map, and a new request with the same ID
- * comes in, one request will be in the map and one will return to the 
producer with a response depending on the epoch.
- */
-class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
-                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback],
-                                  val startTimeMs: mutable.Map[String, Long],
-                                  val transactionSupportedOperation: 
TransactionSupportedOperation)
-
-class AddPartitionsToTxnManager(
-  config: KafkaConfig,
-  client: NetworkClient,
-  metadataCache: MetadataCache,
-  partitionFor: String => Int,
-  time: Time
-) extends InterBrokerSendThread(
-  "AddPartitionsToTxnSenderThread-" + config.brokerId,
-  client,
-  config.requestTimeoutMs,
-  time
-) with Logging {
-
-  this.logIdent = logPrefix
-
-  private val interBrokerListenerName = config.interBrokerListenerName
-  private val inflightNodes = mutable.HashSet[Node]()
-  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
-
-  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
-  private val verificationFailureRate = 
metricsGroup.newMeter(VerificationFailureRateMetricName, "failures", 
TimeUnit.SECONDS)
-  private val verificationTimeMs = 
metricsGroup.newHistogram(VerificationTimeMsMetricName)
-
-  def addOrVerifyTransaction(
-    transactionalId: String,
-    producerId: Long,
-    producerEpoch: Short,
-    topicPartitions: Seq[TopicPartition],
-    callback: AddPartitionsToTxnManager.AppendCallback,
-    transactionSupportedOperation: TransactionSupportedOperation
-  ): Unit = {
-    val coordinatorNode = 
getTransactionCoordinator(partitionFor(transactionalId))
-    if (coordinatorNode.isEmpty) {
-      callback(topicPartitions.map(tp => tp -> 
Errors.COORDINATOR_NOT_AVAILABLE).toMap)
-    } else {
-      val topicCollection = new AddPartitionsToTxnTopicCollection()
-      topicPartitions.groupBy(_.topic).foreachEntry { (topic, tps) =>
-        topicCollection.add(new AddPartitionsToTxnTopic()
-          .setName(topic)
-          .setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava))
-      }
-
-      val transactionData = new AddPartitionsToTxnTransaction()
-        .setTransactionalId(transactionalId)
-        .setProducerId(producerId)
-        .setProducerEpoch(producerEpoch)
-        .setVerifyOnly(!transactionSupportedOperation.supportsEpochBump)
-        .setTopics(topicCollection)
-
-      addTxnData(coordinatorNode.get, transactionData, callback, 
transactionSupportedOperation)
-
-    }
-  }
-
-  private def addTxnData(
-    node: Node,
-    transactionData: AddPartitionsToTxnTransaction,
-    callback: AddPartitionsToTxnManager.AppendCallback,
-    transactionSupportedOperation: TransactionSupportedOperation
-  ): Unit = {
-    nodesToTransactions.synchronized {
-      val curTime = time.milliseconds()
-      // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-      val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
-        new TransactionDataAndCallbacks(
-          new AddPartitionsToTxnTransactionCollection(1),
-          mutable.Map[String, AddPartitionsToTxnManager.AppendCallback](),
-          mutable.Map[String, Long](),
-          transactionSupportedOperation))
-
-      val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
-
-      // There are 3 cases if we already have existing data
-      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
-      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception
-      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
-      if (existingTransactionData != null) {
-        if (existingTransactionData.producerEpoch <= 
transactionData.producerEpoch) {
-          val error = if (existingTransactionData.producerEpoch < 
transactionData.producerEpoch)
-            Errors.INVALID_PRODUCER_EPOCH
-          else
-            Errors.NETWORK_EXCEPTION
-          val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId)
-          
existingNodeAndTransactionData.transactionData.remove(transactionData)
-          sendCallback(oldCallback, 
topicPartitionsToError(existingTransactionData, error), 
existingNodeAndTransactionData.startTimeMs(transactionData.transactionalId))
-        } else {
-          // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
-          sendCallback(callback, topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH), curTime)
-          return
-        }
-      }
-
-      existingNodeAndTransactionData.transactionData.add(transactionData)
-      
existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId, 
callback)
-      
existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId, 
curTime)
-      wakeup()
-    }
-  }
-
-  private def getTransactionCoordinator(partition: Int): util.Optional[Node] = 
{
-   metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-      .filter(_.leader != MetadataResponse.NO_LEADER_ID)
-      .flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, 
interBrokerListenerName))
-  }
-
-  private def topicPartitionsToError(transactionData: 
AddPartitionsToTxnTransaction, error: Errors): Map[TopicPartition, Errors] = {
-    val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-    transactionData.topics.forEach { topic =>
-      topic.partitions.forEach { partition =>
-        topicPartitionsToError.put(new TopicPartition(topic.name, partition), 
error)
-      }
-    }
-    verificationFailureRate.mark(topicPartitionsToError.size)
-    topicPartitionsToError.toMap
-  }
-
-  private def sendCallback(callback: AddPartitionsToTxnManager.AppendCallback, 
errorMap: Map[TopicPartition, Errors], startTimeMs: Long): Unit = {
-    verificationTimeMs.update(time.milliseconds() - startTimeMs)
-    callback(errorMap)
-  }
-
-  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
-    override def onComplete(response: ClientResponse): Unit = {
-      // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
-      inflightNodes.remove(node)
-      if (response.authenticationException != null) {
-        error(s"AddPartitionsToTxnRequest failed for node 
${response.destination} with an " +
-          "authentication exception.", response.authenticationException)
-        
sendCallbacksToAll(Errors.forException(response.authenticationException).code)
-      } else if (response.versionMismatch != null) {
-        // We may see unsupported version exception if we try to send a verify 
only request to a broker that can't handle it.
-        // In this case, skip verification.
-        warn(s"AddPartitionsToTxnRequest failed for node 
${response.destination} with invalid version exception. This suggests 
verification is not supported." +
-          s"Continuing handling the produce request.")
-        transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
-          sendCallback(callback, Map.empty, 
transactionDataAndCallbacks.startTimeMs(txnId))
-        }
-      } else if (response.wasDisconnected || response.wasTimedOut) {
-        warn(s"AddPartitionsToTxnRequest failed for node 
${response.destination} with a network exception.")
-        sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code)
-      } else {
-        val addPartitionsToTxnResponseData = 
response.responseBody.asInstanceOf[AddPartitionsToTxnResponse].data
-        if (addPartitionsToTxnResponseData.errorCode != 0) {
-          error(s"AddPartitionsToTxnRequest for node ${response.destination} 
returned with error 
${Errors.forCode(addPartitionsToTxnResponseData.errorCode)}.")
-          // The client should not be exposed to CLUSTER_AUTHORIZATION_FAILED 
so modify the error to signify the verification did not complete.
-          // Return INVALID_TXN_STATE.
-          val finalError = if (addPartitionsToTxnResponseData.errorCode == 
Errors.CLUSTER_AUTHORIZATION_FAILED.code)
-            Errors.INVALID_TXN_STATE.code
-          else
-            addPartitionsToTxnResponseData.errorCode
-
-          sendCallbacksToAll(finalError)
-        } else {
-          addPartitionsToTxnResponseData.resultsByTransaction.forEach { 
transactionResult =>
-            val unverified = mutable.Map[TopicPartition, Errors]()
-            transactionResult.topicResults.forEach { topicResult =>
-              topicResult.resultsByPartition.forEach { partitionResult =>
-                val tp = new TopicPartition(topicResult.name, 
partitionResult.partitionIndex)
-                if (partitionResult.partitionErrorCode != Errors.NONE.code) {
-                  // Producers expect to handle INVALID_PRODUCER_EPOCH in this 
scenario.
-                  val code =
-                    if (partitionResult.partitionErrorCode == 
Errors.PRODUCER_FENCED.code)
-                      Errors.INVALID_PRODUCER_EPOCH.code
-                    else if (partitionResult.partitionErrorCode() == 
Errors.TRANSACTION_ABORTABLE.code
-                      && 
transactionDataAndCallbacks.transactionSupportedOperation == defaultError) // 
For backward compatibility with clients.
-                      Errors.INVALID_TXN_STATE.code
-                    else
-                      partitionResult.partitionErrorCode
-                  unverified.put(tp, Errors.forCode(code))
-                }
-              }
-            }
-            verificationFailureRate.mark(unverified.size)
-            val callback = 
transactionDataAndCallbacks.callbacks(transactionResult.transactionalId)
-            sendCallback(callback, unverified.toMap, 
transactionDataAndCallbacks.startTimeMs(transactionResult.transactionalId))
-          }
-        }
-      }
-      wakeup()
-    }
-
-    private def buildErrorMap(transactionalId: String, errorCode: Short): 
Map[TopicPartition, Errors] = {
-      val transactionData = 
transactionDataAndCallbacks.transactionData.find(transactionalId)
-      topicPartitionsToError(transactionData, Errors.forCode(errorCode))
-    }
-
-    private def sendCallbacksToAll(errorCode: Short): Unit = {
-      transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) =>
-        sendCallback(callback, buildErrorMap(txnId, errorCode), 
transactionDataAndCallbacks.startTimeMs(txnId))
-      }
-    }
-  }
-
-  override def generateRequests(): 
util.Collection[RequestAndCompletionHandler] = {
-    // build and add requests to queue
-    val list = new util.ArrayList[RequestAndCompletionHandler]()
-    val currentTimeMs = time.milliseconds()
-    val removedNodes = mutable.Set[Node]()
-    nodesToTransactions.synchronized {
-      nodesToTransactions.foreach { case (node, transactionDataAndCallbacks) =>
-        if (!inflightNodes.contains(node)) {
-          list.add(new RequestAndCompletionHandler(
-            currentTimeMs,
-            node,
-            
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData),
-            new AddPartitionsToTxnHandler(node, transactionDataAndCallbacks)
-          ))
-
-          removedNodes.add(node)
-        }
-      }
-      removedNodes.foreach { node =>
-        inflightNodes.add(node)
-        nodesToTransactions.remove(node)
-      }
-    }
-    list
-  }
-
-  override def shutdown(): Unit = {
-    super.shutdown()
-    metricsGroup.removeMetric(VerificationFailureRateMetricName)
-    metricsGroup.removeMetric(VerificationTimeMsMetricName)
-  }
-
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 22749f1f4af..22c6847ded0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -55,6 +55,7 @@ import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
 import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, 
DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 94131c65d8c..bdd9c94a319 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -973,7 +973,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     val oldListeners = oldConfig.listeners.map(l => 
ListenerName.normalised(l.listener)).toSet
     if (!oldAdvertisedListeners.subsetOf(newListeners))
       throw new ConfigException(s"Advertised listeners 
'$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
-    if 
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
+    if 
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet.asScala))
       throw new ConfigException(s"Listeners '$newListeners' must be subset of 
listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
     newListeners.intersect(oldListeners).foreach { listenerName =>
       def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
@@ -985,7 +985,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
       if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != 
immutableListenerConfigs(oldConfig, listenerName.configPrefix))
         throw new ConfigException(s"Configs cannot be updated dynamically for 
existing listener $listenerName, " +
           "restart broker or create a new listener for update")
-      if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != 
newConfig.effectiveListenerSecurityProtocolMap(listenerName))
+      if (oldConfig.effectiveListenerSecurityProtocolMap.get(listenerName) != 
newConfig.effectiveListenerSecurityProtocolMap.get(listenerName))
         throw new ConfigException(s"Security protocol cannot be updated for 
existing listener $listenerName")
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 595ef5dc2c5..3ca43aa145b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -68,6 +68,7 @@ import org.apache.kafka.server.share.context.ShareFetchContext
 import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
SharePartitionKey}
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.AppendOrigin
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 487e0b1fac4..67c6febe1ca 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import java.util.Properties
 import kafka.utils.{CoreUtils, Logging}
 import kafka.utils.Implicits._
-import org.apache.kafka.common.{Endpoint, KafkaException, Reconfigurable}
+import org.apache.kafka.common.{Endpoint, Reconfigurable}
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, TopicConfig}
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -44,10 +44,10 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
 import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, 
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.server.util.Csv
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
 
 import scala.jdk.CollectionConverters._
@@ -144,14 +144,6 @@ object KafkaConfig {
     }
     output
   }
-
-  private def parseListenerName(connectionString: String): String = {
-    val firstColon = connectionString.indexOf(':')
-    if (firstColon < 0) {
-      throw new KafkaException(s"Unable to parse a listener name from 
$connectionString")
-    }
-    connectionString.substring(0, firstColon).toUpperCase(util.Locale.ROOT)
-  }
 }
 
 /**
@@ -216,7 +208,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def quotaConfig: QuotaConfig = _quotaConfig
 
   /** ********* General Configuration ***********/
-  var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG)
   val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
   val initialRegistrationTimeoutMs: Int = 
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
   val brokerHeartbeatIntervalMs: Int = 
getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
@@ -255,7 +246,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val serverMaxStartupTimeMs = 
getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
 
   def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
-  val requestTimeoutMs = getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG)
   val connectionSetupTimeoutMs = 
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
   val connectionSetupTimeoutMaxMs = 
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
 
@@ -306,7 +296,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val socketListenBacklogSize = 
getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
   def maxConnectionsPerIp = 
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
   def maxConnectionsPerIpOverrides: Map[String, Int] =
-    getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, 
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).map { 
case (k, v) => (k, v.toInt)}
+    getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, 
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).asScala.map
 { case (k, v) => (k, v.toInt)}
   def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
   def maxConnectionCreationRate = 
getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
   val connectionsMaxIdleMs = 
getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG)
@@ -408,8 +398,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       Set.empty[String]
   }
 
-  def interBrokerListenerName = 
getInterBrokerListenerNameAndSecurityProtocol._1
-  def interBrokerSecurityProtocol = 
getInterBrokerListenerNameAndSecurityProtocol._2
   def saslMechanismInterBrokerProtocol = 
getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
 
   /** ********* Fetch Configuration **************/
@@ -453,26 +441,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     millis
   }
 
-  private def getMap(propName: String, propValue: String): Map[String, String] 
= {
-    try {
-      Csv.parseCsvMap(propValue).asScala
-    } catch {
-      case e: Exception => throw new IllegalArgumentException("Error parsing 
configuration property '%s': %s".format(propName, e.getMessage))
-    }
-  }
-
   def listeners: Seq[Endpoint] =
     
CoreUtils.listenerListToEndPoints(getString(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap)
 
-  def controllerListenerNames: Seq[String] = {
-    val value = 
Option(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG)).getOrElse("")
-    if (value.isEmpty) {
-      Seq.empty
-    } else {
-      value.split(",")
-    }
-  }
-
   def controllerListeners: Seq[Endpoint] =
     listeners.filter(l => controllerListenerNames.contains(l.listener))
 
@@ -495,7 +466,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
     val controllerListenersValue = controllerListeners
 
-    controllerListenerNames.flatMap { name =>
+    controllerListenerNames.asScala.flatMap { name =>
       controllerAdvertisedListeners
         .find(endpoint => 
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
         .orElse(
@@ -526,57 +497,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     advertisedListeners.filterNot(l => 
controllerListenerNames.contains(l.listener))
   }
 
-  private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
-    Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)) 
match {
-      case Some(_) if 
originals.containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG) 
=>
-        throw new ConfigException(s"Only one of 
${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} and " +
-          s"${ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG} should 
be set.")
-      case Some(name) =>
-        val listenerName = ListenerName.normalised(name)
-        val securityProtocol = 
effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
-          throw new ConfigException(s"Listener with name ${listenerName.value} 
defined in " +
-            s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} not 
found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}."))
-        (listenerName, securityProtocol)
-      case None =>
-        val securityProtocol = 
getSecurityProtocol(getString(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG),
-          ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG)
-        (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
-    }
-  }
-
-  private def getSecurityProtocol(protocolName: String, configName: String): 
SecurityProtocol = {
-    try SecurityProtocol.forName(protocolName)
-    catch {
-      case _: IllegalArgumentException =>
-        throw new ConfigException(s"Invalid security protocol `$protocolName` 
defined in $configName")
-    }
-  }
-
-  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {
-    val mapValue = 
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
-      .map { case (listenerName, protocolName) =>
-        ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, 
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
-      }
-    if 
(!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
 {
-      // Nothing was specified explicitly for listener.security.protocol.map, 
so we are using the default value,
-      // and we are using KRaft.
-      // Add PLAINTEXT mappings for controller listeners as long as there is 
no SSL or SASL_{PLAINTEXT,SSL} in use
-      def isSslOrSasl(name: String): Boolean = 
name.equals(SecurityProtocol.SSL.name) || 
name.equals(SecurityProtocol.SASL_SSL.name) || 
name.equals(SecurityProtocol.SASL_PLAINTEXT.name)
-      // check controller listener names (they won't appear in listeners when 
process.roles=broker)
-      // as well as listeners for occurrences of SSL or SASL_*
-      if (controllerListenerNames.exists(isSslOrSasl) ||
-        
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue
 => isSslOrSasl(KafkaConfig.parseListenerName(listenerValue)))) {
-        mapValue // don't add default mappings since we found something that 
is SSL or SASL_*
-      } else {
-        // add the PLAINTEXT mappings for all controller listener names that 
are not explicitly PLAINTEXT
-        mapValue ++ 
controllerListenerNames.filterNot(SecurityProtocol.PLAINTEXT.name.equals(_)).map(
-          new ListenerName(_) -> SecurityProtocol.PLAINTEXT)
-      }
-    } else {
-      mapValue
-    }
-  }
-
   validateValues()
 
   private def validateValues(): Unit = {
@@ -617,7 +537,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
     def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
       val listenerNameValues = listeners.map(_.listener).toSet
-      require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+      require(controllerListenerNames.stream().allMatch(cln => 
listenerNameValues.contains(cln)),
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain 
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration 
when running the KRaft controller role")
     }
     def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
@@ -637,22 +557,22 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       require(!voterIds.contains(nodeId),
         s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' 
role, the node id $nodeId must not be included in the set of voters 
${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
       // controller.listener.names must be non-empty...
-      require(controllerListenerNames.nonEmpty,
+      require(controllerListenerNames.size() > 0,
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at 
least one value when running KRaft with just the broker role")
       // controller.listener.names are forbidden in listeners...
       require(controllerListeners.isEmpty,
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain a 
value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration 
when running KRaft with just the broker role")
       // controller.listener.names must all appear in 
listener.security.protocol.map
-      controllerListenerNames.foreach { name =>
+      controllerListenerNames.forEach { name =>
         val listenerName = ListenerName.normalised(name)
-        if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
+        if (!effectiveListenerSecurityProtocolMap.containsKey(listenerName)) {
           throw new ConfigException(s"Controller listener with name 
${listenerName.value} defined in " +
             s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in 
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}  (an explicit 
security mapping for each controller listener is required if 
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or 
if there are security protocols other than PLAINTEXT in use)")
         }
       }
       // warn that only the first controller listener is used if there is more 
than one
       if (controllerListenerNames.size > 1) {
-        warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple 
entries; only the first will be used since 
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
+        warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple 
entries; only the first will be used since 
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames}")
       }
       // warn if create.topic.policy.class.name or 
alter.config.policy.class.name is defined in the broker role
       warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, 
ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
diff --git 
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index c353a825503..cd6b8e1d134 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -55,8 +55,9 @@ object RaftControllerNodeProvider {
     raftManager: RaftManager[ApiMessageAndVersion],
     config: KafkaConfig,
   ): RaftControllerNodeProvider = {
-    val controllerListenerName = new 
ListenerName(config.controllerListenerNames.head)
-    val controllerSecurityProtocol = 
config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, 
SecurityProtocol.forName(controllerListenerName.value()))
+    val controllerListenerName = new 
ListenerName(config.controllerListenerNames.get(0))
+    val controllerSecurityProtocol = 
Option(config.effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+      .getOrElse(SecurityProtocol.forName(controllerListenerName.value()))
     val controllerSaslMechanism = config.saslMechanismControllerProtocol
     new RaftControllerNodeProvider(
       raftManager,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32f51acf77f..7c1b13b798c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -59,6 +59,8 @@ import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets, 
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, 
DelayedShareFetchPartitionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
+import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
 import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, 
LogReadResult, common}
@@ -1054,18 +1056,18 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     def invokeCallback(
-      verificationErrors: Map[TopicPartition, Errors]
+      verificationErrors: java.util.Map[TopicPartition, Errors]
     ): Unit = {
-      callback((errors ++ verificationErrors, verificationGuards.toMap))
+      callback((errors ++ verificationErrors.asScala, 
verificationGuards.toMap))
     }
 
     addPartitionsToTxnManager.foreach(_.addOrVerifyTransaction(
-      transactionalId = transactionalId,
-      producerId = producerId,
-      producerEpoch = producerEpoch,
-      topicPartitions = verificationGuards.keys.toSeq,
-      callback = invokeCallback,
-      transactionSupportedOperation = transactionSupportedOperation
+      transactionalId,
+      producerId,
+      producerEpoch,
+      verificationGuards.keys.toSeq.asJava,
+      invokeCallback,
+      transactionSupportedOperation
     ))
 
   }
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 08a29b3d01d..c7b4f28e336 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -126,7 +126,7 @@ object StorageTool extends Logging {
       setClusterId(namespace.getString("cluster_id")).
       setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
       setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
-      setControllerListenerName(config.controllerListenerNames.head).
+      setControllerListenerName(config.controllerListenerNames.get(0)).
       setMetadataLogDirectory(config.metadataLogDir)
     Option(namespace.getString("release_version")).foreach(
       releaseVersion => formatter.
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 8af81d97a49..31a8aad7c80 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -23,7 +23,6 @@ import java.lang.management.ManagementFactory
 import com.typesafe.scalalogging.Logger
 
 import javax.management.ObjectName
-import scala.collection._
 import scala.collection.Seq
 import org.apache.commons.validator.routines.InetAddressValidator
 import org.apache.kafka.common.Endpoint
@@ -122,7 +121,7 @@ object CoreUtils {
 
   def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = 
inLock[T](lock.writeLock)(fun)
 
-  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
+  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
     listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
   }
 
@@ -131,7 +130,7 @@ object CoreUtils {
     require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
   }
 
-  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[Endpoint] = {
+  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
java.util.Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[Endpoint] = {
     def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
       (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
         (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
@@ -186,7 +185,7 @@ object CoreUtils {
     }
 
     val endPoints = try {
-      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap.asJava).asScala
+      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap).asScala
     } catch {
       case e: Exception =>
         throw new IllegalArgumentException(s"Error creating broker listeners 
from '$listeners': ${e.getMessage}", e)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 85b8e2298ef..6c491e739e3 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -275,7 +275,7 @@ abstract class QuorumTestHarness extends Logging {
     formatter.addDirectory(metadataDir.getAbsolutePath)
     formatter.setReleaseVersion(metadataVersion)
     formatter.setUnstableFeatureVersionsEnabled(true)
-    formatter.setControllerListenerName(config.controllerListenerNames.head)
+    formatter.setControllerListenerName(config.controllerListenerNames.get(0))
     formatter.setMetadataLogDirectory(config.metadataLogDir)
 
     val transactionVersion =
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 2e9f95beb51..d5dadcfd9f4 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey}
+import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{MockTimer, Timer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
UnifiedLog, VerificationGuard}
diff --git 
a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
index 939d63789ff..84c0e41b724 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
@@ -31,6 +31,9 @@ import org.apache.kafka.common.requests.{AbstractResponse, 
AddPartitionsToTxnReq
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
+import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.{AppendCallback, 
TransactionSupportedOperation}
+import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION,
 DEFAULT_ERROR, GENERIC_ERROR_SUPPORTED}
 import org.apache.kafka.server.util.RequestAndCompletionHandler
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -49,7 +52,7 @@ import scala.jdk.CollectionConverters._
 class AddPartitionsToTxnManagerTest {
   private val networkClient: NetworkClient = mock(classOf[NetworkClient])
   private val metadataCache: MetadataCache = mock(classOf[MetadataCache])
-  private val partitionFor: String => Int = mock(classOf[String => Int])
+  private val partitionFor: util.function.Function[String, Integer] = 
mock(classOf[util.function.Function[String, Integer]])
 
   private val time = new MockTime
 
@@ -73,7 +76,7 @@ class AddPartitionsToTxnManagerTest {
   private val authenticationErrorResponse = clientResponse(null, authException 
= new SaslAuthenticationException(""))
   private val versionMismatchResponse = clientResponse(null, mismatchException 
= new UnsupportedVersionException(""))
   private val disconnectedResponse = clientResponse(null, disconnected = true)
-  private val transactionSupportedOperation = genericErrorSupported
+  private val transactionSupportedOperation = GENERIC_ERROR_SUPPORTED
 
   private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
 
@@ -93,14 +96,14 @@ class AddPartitionsToTxnManagerTest {
     addPartitionsToTxnManager.shutdown()
   }
 
-  private def setErrors(errors: mutable.Map[TopicPartition, 
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
-    callbackErrors.foreachEntry(errors.put)
+  private def setErrors(errors: mutable.Map[TopicPartition, Errors]): 
AppendCallback = {
+    callbackErrors => callbackErrors.forEach((tp, err) => errors.put(tp, err))
   }
 
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testAddTxnData(isAddPartition: Boolean): Unit = {
-    val transactionSupportedOperation = if (isAddPartition) addPartition else 
genericErrorSupported
+    val transactionSupportedOperation = if (isAddPartition) ADD_PARTITION else 
GENERIC_ERROR_SUPPORTED
     when(partitionFor.apply(transactionalId1)).thenReturn(0)
     when(partitionFor.apply(transactionalId2)).thenReturn(1)
     when(partitionFor.apply(transactionalId3)).thenReturn(0)
@@ -111,9 +114,9 @@ class AddPartitionsToTxnManagerTest {
     val transaction2Errors = mutable.Map[TopicPartition, Errors]()
     val transaction3Errors = mutable.Map[TopicPartition, Errors]()
 
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors), 
transactionSupportedOperation)
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors), 
transactionSupportedOperation)
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3, 
producerId3, producerEpoch = 0, topicPartitions, setErrors(transaction3Errors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, setErrors(transaction1Errors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, 0, topicPartitions.asJava, setErrors(transaction2Errors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3, 
producerId3, 0, topicPartitions.asJava, setErrors(transaction3Errors), 
transactionSupportedOperation)
 
     // We will try to add transaction1 3 more times (retries). One will have 
the same epoch, one will have a newer epoch, and one will have an older epoch 
than the new one we just added.
     val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition, 
Errors]()
@@ -121,17 +124,17 @@ class AddPartitionsToTxnManagerTest {
     val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition, 
Errors]()
 
     // Trying to add more transactional data for the same transactional ID, 
producer ID, and epoch should simply replace the old data and send a retriable 
response.
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, 
setErrors(transaction1RetryWithSameEpochErrors), transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, 
setErrors(transaction1RetryWithSameEpochErrors), transactionSupportedOperation)
     val expectedNetworkErrors = topicPartitions.map(_ -> 
Errors.NETWORK_EXCEPTION).toMap
     assertEquals(expectedNetworkErrors, transaction1Errors)
 
     // Trying to add more transactional data for the same transactional ID and 
producer ID, but new epoch should replace the old data and send an error 
response for it.
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 1, topicPartitions, 
setErrors(transaction1RetryWithNewerEpochErrors), transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 1, topicPartitions.asJava, 
setErrors(transaction1RetryWithNewerEpochErrors), transactionSupportedOperation)
     val expectedEpochErrors = topicPartitions.map(_ -> 
Errors.INVALID_PRODUCER_EPOCH).toMap
     assertEquals(expectedEpochErrors, transaction1RetryWithSameEpochErrors)
 
     // Trying to add more transactional data for the same transactional ID and 
producer ID, but an older epoch should immediately return with error and keep 
the old data queued to send.
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, 
setErrors(transaction1RetryWithOldEpochErrors), transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, 
setErrors(transaction1RetryWithOldEpochErrors), transactionSupportedOperation)
     assertEquals(expectedEpochErrors, transaction1RetryWithOldEpochErrors)
 
     val requestsAndHandlers = 
addPartitionsToTxnManager.generateRequests().asScala
@@ -162,12 +165,12 @@ class AddPartitionsToTxnManagerTest {
     mockTransactionStateMetadata(0, 0, Some(node0))
     mockTransactionStateMetadata(1, 1, Some(node1))
     mockTransactionStateMetadata(2, 2, Some(node2))
-    val transactionSupportedOperation = if (isAddPartition) addPartition else 
genericErrorSupported
+    val transactionSupportedOperation = if (isAddPartition) ADD_PARTITION else 
GENERIC_ERROR_SUPPORTED
 
     val transactionErrors = mutable.Map[TopicPartition, Errors]()
 
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), 
transactionSupportedOperation)
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, setErrors(transactionErrors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, 0, topicPartitions.asJava, setErrors(transactionErrors), 
transactionSupportedOperation)
 
     val requestsAndHandlers = 
addPartitionsToTxnManager.generateRequests().asScala
     assertEquals(2, requestsAndHandlers.size)
@@ -177,8 +180,8 @@ class AddPartitionsToTxnManagerTest {
       else verifyRequest(node1, transactionalId2, producerId2, 
!isAddPartition, requestAndHandler)
     }
 
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), 
transactionSupportedOperation)
-    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3, 
producerId3, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, 0, topicPartitions.asJava, setErrors(transactionErrors), 
transactionSupportedOperation)
+    addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3, 
producerId3, 0, topicPartitions.asJava, setErrors(transactionErrors), 
transactionSupportedOperation)
 
     // Test creationTimeMs increases too.
     time.sleep(10)
@@ -209,8 +212,8 @@ class AddPartitionsToTxnManagerTest {
       addPartitionsToTxnManager.addOrVerifyTransaction(
         transactionalId1,
         producerId1,
-        producerEpoch = 0,
-        topicPartitions,
+        0,
+        topicPartitions.asJava,
         setErrors(errors),
         transactionSupportedOperation
       )
@@ -245,16 +248,16 @@ class AddPartitionsToTxnManagerTest {
       transaction1Errors.clear()
       transaction2Errors.clear()
 
-      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors), 
transactionSupportedOperation)
-      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors), 
transactionSupportedOperation)
+      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, setErrors(transaction1Errors), 
transactionSupportedOperation)
+      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, 0, topicPartitions.asJava, setErrors(transaction2Errors), 
transactionSupportedOperation)
     }
 
     def addTransactionsToVerifyRequestVersion(operationExpected: 
TransactionSupportedOperation): Unit = {
       transaction1Errors.clear()
       transaction2Errors.clear()
 
-      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors), 
operationExpected)
-      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors), 
operationExpected)
+      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, setErrors(transaction1Errors), 
operationExpected)
+      addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2, 
producerId2, 0, topicPartitions.asJava, setErrors(transaction2Errors), 
operationExpected)
     }
 
     val expectedAuthErrors = topicPartitions.map(_ -> 
Errors.SASL_AUTHENTICATION_FAILED).toMap
@@ -318,12 +321,12 @@ class AddPartitionsToTxnManagerTest {
     val expectedTransactionAbortableErrorsTxn1HigherVersion = 
topicPartitions.map(_ -> Errors.TRANSACTION_ABORTABLE).toMap
     val expectedTransactionAbortableErrorsTxn2HigherVersion = Map(new 
TopicPartition("foo", 2) -> Errors.TRANSACTION_ABORTABLE)
 
-    addTransactionsToVerifyRequestVersion(defaultError)
+    addTransactionsToVerifyRequestVersion(DEFAULT_ERROR)
     receiveResponse(mixedAbortableErrorsResponse)
     assertEquals(expectedTransactionAbortableErrorsTxn1LowerVersion, 
transaction1Errors)
     assertEquals(expectedTransactionAbortableErrorsTxn2LowerVersion, 
transaction2Errors)
 
-    addTransactionsToVerifyRequestVersion(genericErrorSupported)
+    addTransactionsToVerifyRequestVersion(GENERIC_ERROR_SUPPORTED)
     receiveResponse(mixedAbortableErrorsResponse)
     assertEquals(expectedTransactionAbortableErrorsTxn1HigherVersion, 
transaction1Errors)
     assertEquals(expectedTransactionAbortableErrorsTxn2HigherVersion, 
transaction2Errors)
@@ -351,8 +354,8 @@ class AddPartitionsToTxnManagerTest {
     }
 
     val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup], 
(mock: KafkaMetricsGroup, context: Context) => {
-      
when(mock.newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationFailureRateMetricName),
 anyString(), any(classOf[TimeUnit]))).thenReturn(mockVerificationFailureMeter)
-      
when(mock.newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationTimeMsMetricName))).thenReturn(mockVerificationTime)
+      
when(mock.newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_FAILURE_RATE_METRIC_NAME),
 anyString(), any(classOf[TimeUnit]))).thenReturn(mockVerificationFailureMeter)
+      
when(mock.newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_TIME_MS_METRIC_NAME))).thenReturn(mockVerificationTime)
     })
 
     val addPartitionsManagerWithMockedMetrics = new AddPartitionsToTxnManager(
@@ -364,8 +367,8 @@ class AddPartitionsToTxnManagerTest {
     )
 
     try {
-      
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId1, 
producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), 
transactionSupportedOperation)
-      
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId2, 
producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), 
transactionSupportedOperation)
+      
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId1, 
producerId1, 0, topicPartitions.asJava, setErrors(transactionErrors), 
transactionSupportedOperation)
+      
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId2, 
producerId2, 0, topicPartitions.asJava, setErrors(transactionErrors), 
transactionSupportedOperation)
 
       time.sleep(100)
 
@@ -386,10 +389,10 @@ class AddPartitionsToTxnManagerTest {
 
       val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
 
-      
verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationFailureRateMetricName),
 anyString(), any(classOf[TimeUnit]))
-      
verify(mockMetricsGroup).newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationTimeMsMetricName))
-      
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VerificationFailureRateMetricName)
-      
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VerificationTimeMsMetricName)
+      
verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_FAILURE_RATE_METRIC_NAME),
 anyString(), any(classOf[TimeUnit]))
+      
verify(mockMetricsGroup).newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_TIME_MS_METRIC_NAME))
+      
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VERIFICATION_FAILURE_RATE_METRIC_NAME)
+      
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VERIFICATION_TIME_MS_METRIC_NAME)
 
       // assert that we have verified all invocations on the metrics group.
       verifyNoMoreInteractions(mockMetricsGroup)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 77e8449199c..4f269d5ed7c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -441,7 +441,7 @@ class KafkaConfigTest {
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
     val controllerListenerName = new ListenerName("CONTROLLER")
-    assertEquals(Some(SecurityProtocol.PLAINTEXT),
+    assertEquals(SecurityProtocol.PLAINTEXT,
       
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
     // ensure we don't map it to PLAINTEXT when there is a SSL or SASL 
controller listener
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER,SSL")
@@ -454,7 +454,7 @@ class KafkaConfigTest {
     props.remove(SocketServerConfigs.LISTENERS_CONFIG)
     // ensure we don't map it to PLAINTEXT when it is explicitly mapped 
otherwise
     
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    assertEquals(Some(SecurityProtocol.SSL),
+    assertEquals(SecurityProtocol.SSL,
       
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
     // ensure we don't map it to PLAINTEXT when anything is explicitly given
     // (i.e. it is only part of the default value, even with KRaft)
@@ -463,7 +463,7 @@ class KafkaConfigTest {
     // ensure we can map it to a non-PLAINTEXT security protocol by default 
(i.e. when nothing is given)
     props.remove(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
-    assertEquals(Some(SecurityProtocol.SSL),
+    assertEquals(SecurityProtocol.SSL,
       
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new 
ListenerName("SSL")))
   }
 
@@ -475,9 +475,9 @@ class KafkaConfigTest {
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER1,CONTROLLER2")
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092")
-    assertEquals(Some(SecurityProtocol.PLAINTEXT),
+    assertEquals(SecurityProtocol.PLAINTEXT,
       
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new 
ListenerName("CONTROLLER1")))
-    assertEquals(Some(SecurityProtocol.PLAINTEXT),
+    assertEquals(SecurityProtocol.PLAINTEXT,
       
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new 
ListenerName("CONTROLLER2")))
   }
 
@@ -511,11 +511,11 @@ class KafkaConfigTest {
       new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))
     assertEquals(expectedListeners, config.listeners)
     assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
-    val expectedSecurityProtocolMap = Map(
-      new ListenerName("CLIENT") -> SecurityProtocol.SSL,
-      new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
-      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
-      new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
+    val expectedSecurityProtocolMap = util.Map.of(
+      new ListenerName("CLIENT"), SecurityProtocol.SSL,
+      new ListenerName("REPLICATION"), SecurityProtocol.SSL,
+      new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT,
+      new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT
     )
     assertEquals(expectedSecurityProtocolMap, 
config.effectiveListenerSecurityProtocolMap)
   }
@@ -546,10 +546,10 @@ class KafkaConfigTest {
     )
     assertEquals(expectedAdvertisedListeners, 
config.effectiveAdvertisedBrokerListeners)
 
-    val expectedSecurityProtocolMap = Map(
-      new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
-      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
-      new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
+    val expectedSecurityProtocolMap = util.Map.of(
+      new ListenerName("EXTERNAL"), SecurityProtocol.SSL,
+      new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT,
+      new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT
     )
     assertEquals(expectedSecurityProtocolMap, 
config.effectiveListenerSecurityProtocolMap)
   }
@@ -597,7 +597,7 @@ class KafkaConfigTest {
   }
 
   private def listenerListToEndPoints(listenerList: String,
-                              securityProtocolMap: 
collection.Map[ListenerName, SecurityProtocol] = 
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO.asScala) =
+                              securityProtocolMap: util.Map[ListenerName, 
SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
     CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b19de2dde1a..cdae51bb935 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,6 +69,9 @@ import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
+import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
+import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION,
 GENERIC_ERROR_SUPPORTED}
 import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
@@ -155,7 +158,7 @@ class ReplicaManagerTest {
     // Anytime we try to verify, just automatically run the callback as though 
the transaction was verified.
     when(addPartitionsToTxnManager.addOrVerifyTransaction(any(), any(), any(), 
any(), any(), any())).thenAnswer { invocationOnMock =>
       val callback = invocationOnMock.getArgument(4, 
classOf[AddPartitionsToTxnManager.AppendCallback])
-      callback(Map.empty[TopicPartition, Errors].toMap)
+      callback.complete(util.Map.of())
     }
     // make sure metadataCache can map between topic name and id
     setupMetadataCacheWithTopicIds(topicIds, metadataCache)
@@ -2195,7 +2198,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         any[AddPartitionsToTxnManager.AppendCallback](),
         any()
       )
@@ -2232,7 +2235,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
@@ -2241,7 +2244,7 @@ class ReplicaManagerTest {
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
+      callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE))
       assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
@@ -2252,14 +2255,14 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback2.capture(),
         any()
       )
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
       val callback2: AddPartitionsToTxnManager.AppendCallback = 
appendCallback2.getValue
-      callback2(Map.empty[TopicPartition, Errors].toMap)
+      callback2.complete(util.Map.of())
       assertEquals(VerificationGuard.SENTINEL, 
getVerificationGuard(replicaManager, tp0, producerId))
       
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, 
producerEpoch))
     } finally {
@@ -2295,13 +2298,13 @@ class ReplicaManagerTest {
 
       // We should add these partitions to the manager to verify.
       val result = handleProduceAppend(replicaManager, tp0, 
transactionalRecords, origin = AppendOrigin.CLIENT,
-        transactionalId = transactionalId, transactionSupportedOperation = 
addPartition)
+        transactionalId = transactionalId, transactionSupportedOperation = 
ADD_PARTITION)
       val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
       verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
@@ -2310,7 +2313,7 @@ class ReplicaManagerTest {
 
       // Confirm we did not write to the log and instead returned error.
       var callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map(tp0 -> error).toMap)
+      callback.complete(util.Map.of(tp0, error))
 
       if (error != Errors.CONCURRENT_TRANSACTIONS) {
         // NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS
@@ -2327,12 +2330,12 @@ class ReplicaManagerTest {
           ArgumentMatchers.eq(transactionalId),
           ArgumentMatchers.eq(producerId),
           ArgumentMatchers.eq(producerEpoch),
-          ArgumentMatchers.eq(Seq(tp0)),
+          ArgumentMatchers.eq(util.List.of(tp0)),
           appendCallback.capture(),
           any()
         )
         callback = appendCallback.getValue
-        callback(Map.empty[TopicPartition, Errors].toMap)
+        callback.complete(util.Map.of())
         assertEquals(VerificationGuard.SENTINEL, 
getVerificationGuard(replicaManager, tp0, producerId))
         
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, 
producerEpoch))
       }
@@ -2366,7 +2369,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
@@ -2375,7 +2378,7 @@ class ReplicaManagerTest {
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map(tp0 -> Errors.INVALID_PRODUCER_ID_MAPPING).toMap)
+      callback.complete(util.Map.of(tp0, Errors.INVALID_PRODUCER_ID_MAPPING))
       assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, 
result.assertFired.error)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
@@ -2389,7 +2392,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback2.capture(),
         any()
       )
@@ -2397,7 +2400,7 @@ class ReplicaManagerTest {
 
       // Verification should succeed, but we expect to fail with 
OutOfOrderSequence and for the VerificationGuard to remain.
       val callback2: AddPartitionsToTxnManager.AppendCallback = 
appendCallback2.getValue
-      callback2(Map.empty[TopicPartition, Errors].toMap)
+      callback2.complete(util.Map.of())
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
       assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 
result2.assertFired.error)
     } finally {
@@ -2445,7 +2448,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
@@ -2455,7 +2458,7 @@ class ReplicaManagerTest {
 
       // simulate successful verification
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map.empty[TopicPartition, Errors].toMap)
+      callback.complete(util.Map.of())
 
       assertEquals(VerificationGuard.SENTINEL, 
getVerificationGuard(replicaManager, tp0, producerId))
       
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, 
producerEpoch))
@@ -2652,7 +2655,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
@@ -2669,7 +2672,7 @@ class ReplicaManagerTest {
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
+      callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE))
       assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
@@ -2711,20 +2714,20 @@ class ReplicaManagerTest {
 
       // Start verification and return the coordinator related errors.
       val expectedMessage = s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${error.toString}"
-      val result = handleProduceAppend(replicaManager, tp0, 
transactionalRecords, transactionalId = transactionalId, 
transactionSupportedOperation = addPartition)
+      val result = handleProduceAppend(replicaManager, tp0, 
transactionalRecords, transactionalId = transactionalId, 
transactionSupportedOperation = ADD_PARTITION)
       val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
       verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
 
       // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map(tp0 -> error).toMap)
+      callback.complete(util.Map.of(tp0, error))
       assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
       assertEquals(expectedMessage, result.assertFired.errorMessage)
     } finally {
@@ -2767,14 +2770,14 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
 
       // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
-      callback(Map(tp0 -> error).toMap)
+      callback.complete(util.Map.of(tp0, error))
       assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
       assertEquals(expectedMessage, result.assertFired.errorMessage)
     } finally {
@@ -2798,7 +2801,7 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(transactionalId),
         ArgumentMatchers.eq(producerId),
         ArgumentMatchers.eq(producerEpoch),
-        ArgumentMatchers.eq(Seq(tp0)),
+        ArgumentMatchers.eq(util.List.of(tp0)),
         appendCallback.capture(),
         any()
       )
@@ -3088,7 +3091,7 @@ class ReplicaManagerTest {
                                                   entriesToAppend: 
Map[TopicPartition, MemoryRecords],
                                                   transactionalId: String,
                                                   requiredAcks: Short = -1,
-                                                  
transactionSupportedOperation: TransactionSupportedOperation = 
genericErrorSupported
+                                                  
transactionSupportedOperation: TransactionSupportedOperation = 
GENERIC_ERROR_SUPPORTED
                                                  ): 
CallbackResult[Map[TopicIdPartition, PartitionResponse]] = {
     val result = new CallbackResult[Map[TopicIdPartition, PartitionResponse]]()
     def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
@@ -3115,7 +3118,7 @@ class ReplicaManagerTest {
                                   origin: AppendOrigin = AppendOrigin.CLIENT,
                                   requiredAcks: Short = -1,
                                   transactionalId: String,
-                                  transactionSupportedOperation: 
TransactionSupportedOperation = genericErrorSupported
+                                  transactionSupportedOperation: 
TransactionSupportedOperation = GENERIC_ERROR_SUPPORTED
                                  ): CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
 
@@ -3148,7 +3151,7 @@ class ReplicaManagerTest {
                                                             producerId: Long,
                                                             producerEpoch: 
Short,
                                                             baseSequence: Int 
= 0,
-                                                            
transactionSupportedOperation: TransactionSupportedOperation = 
genericErrorSupported
+                                                            
transactionSupportedOperation: TransactionSupportedOperation = 
GENERIC_ERROR_SUPPORTED
                                                            ): 
CallbackResult[Either[Errors, VerificationGuard]] = {
     val result = new CallbackResult[Either[Errors, VerificationGuard]]()
     def postVerificationCallback(errorAndGuard: (Errors, VerificationGuard)): 
Unit = {
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 0cb4be79f2a..16d61722727 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -16,9 +16,13 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
@@ -36,8 +40,10 @@ import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.storage.internals.log.LogConfig;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * During moving {@link kafka.server.KafkaConfig} out of core 
AbstractKafkaConfig will be the future KafkaConfig
@@ -91,4 +97,113 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
     public int backgroundThreads() {
         return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
     }
+
+    public int brokerId() {
+        return getInt(ServerConfigs.BROKER_ID_CONFIG);
+    }
+
+    public int requestTimeoutMs() {
+        return getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    public List<String> controllerListenerNames() {
+        return 
Csv.parseCsvList(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG));
+    }
+
+    public ListenerName interBrokerListenerName() {
+        return interBrokerListenerNameAndSecurityProtocol().getKey();
+    }
+
+    public SecurityProtocol interBrokerSecurityProtocol() {
+        return interBrokerListenerNameAndSecurityProtocol().getValue();
+    }
+
+    public Map<ListenerName, SecurityProtocol> 
effectiveListenerSecurityProtocolMap() {
+        Map<ListenerName, SecurityProtocol> mapValue =
+                
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+                        
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
+                        .entrySet()
+                        .stream()
+                        .collect(Collectors.toMap(
+                                e -> ListenerName.normalised(e.getKey()),
+                                e -> securityProtocol(
+                                        e.getValue(),
+                                        
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)));
+
+        if 
(!originals().containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
 {
+            // Using the default configuration since 
listener.security.protocol.map is not explicitly set.
+            // Before adding default PLAINTEXT mappings for controller 
listeners, verify that:
+            // 1. No SSL or SASL protocols are used in controller listeners
+            // 2. No SSL or SASL protocols are used in regular listeners 
(Note: controller listeners
+            //    are not included in 'listeners' config when 
process.roles=broker)
+            if 
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl) 
||
+                    
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+                            .anyMatch(listenerName -> 
isSslOrSasl(parseListenerName(listenerName)))) {
+                return mapValue;
+            } else {
+                // Add the PLAINTEXT mappings for all controller listener 
names that are not explicitly PLAINTEXT
+                mapValue.putAll(controllerListenerNames().stream()
+                        .filter(listenerName -> 
!SecurityProtocol.PLAINTEXT.name.equals(listenerName))
+                        .collect(Collectors.toMap(ListenerName::new, ignored 
-> SecurityProtocol.PLAINTEXT)));
+                return mapValue;
+            }
+        } else {
+            return mapValue;
+        }
+    }
+
+    public static Map<String, String> getMap(String propName, String 
propValue) {
+        try {
+            return Csv.parseCsvMap(propValue);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    String.format("Error parsing configuration property '%s': 
%s", propName, e.getMessage()));
+        }
+    }
+
+    private static SecurityProtocol securityProtocol(String protocolName, 
String configName) {
+        try {
+            return SecurityProtocol.forName(protocolName);
+        } catch (IllegalArgumentException e) {
+            throw new ConfigException(
+                    String.format("Invalid security protocol `%s` defined in 
%s", protocolName, configName));
+        }
+    }
+
+    private Map.Entry<ListenerName, SecurityProtocol> 
interBrokerListenerNameAndSecurityProtocol() {
+        String interBrokerListenerName = 
getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG);
+        if (interBrokerListenerName != null) {
+            if 
(originals().containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG))
 {
+                throw new ConfigException(String.format("Only one of %s and %s 
should be set.",
+                        ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG,
+                        
ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG));
+            }
+            ListenerName listenerName = 
ListenerName.normalised(interBrokerListenerName);
+            SecurityProtocol securityProtocol = 
effectiveListenerSecurityProtocolMap().get(listenerName);
+            if (securityProtocol == null) {
+                throw new ConfigException("Listener with name " + 
listenerName.value() + " defined in " +
+                        ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG + 
" not found in " +
+                        
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG + ".");
+            }
+            return Map.entry(listenerName, securityProtocol);
+        } else {
+            SecurityProtocol securityProtocol = securityProtocol(
+                    
getString(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG),
+                    ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG);
+            return 
Map.entry(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
+        }
+    }
+
+    private static boolean isSslOrSasl(String name) {
+        return name.equals(SecurityProtocol.SSL.name) || 
name.equals(SecurityProtocol.SASL_SSL.name) ||
+                name.equals(SecurityProtocol.SASL_PLAINTEXT.name);
+    }
+
+    private static String parseListenerName(String connectionString) {
+        int firstColon = connectionString.indexOf(':');
+        if (firstColon < 0) {
+            throw new KafkaException("Unable to parse a listener name from " + 
connectionString);
+        }
+        return connectionString.substring(0, 
firstColon).toUpperCase(Locale.ROOT);
+    }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java
 
b/server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java
new file mode 100644
index 00000000000..b0db7c823f7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.transaction;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.ADD_PARTITION;
+import static 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.DEFAULT_ERROR;
+import static 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.GENERIC_ERROR_SUPPORTED;
+
+public class AddPartitionsToTxnManager extends InterBrokerSendThread {
+
+    public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME = 
"VerificationFailureRate";
+    public static final String VERIFICATION_TIME_MS_METRIC_NAME = 
"VerificationTimeMs";
+
+    /**
+     * handles the Partition Response based on the Request Version and the 
exact operation.
+     */
+    public enum TransactionSupportedOperation {
+        /**
+         * This is the default workflow which maps to cases when the Produce 
Request Version or the
+         * Txn_offset_commit request was lower than the first version 
supporting the new Error Class.
+         */
+        DEFAULT_ERROR(false),
+        /**
+         * This maps to the case when the clients are updated to handle the 
TransactionAbortableException.
+         */
+        GENERIC_ERROR_SUPPORTED(false),
+        /**
+         * This allows the partition to be added to the transactions inflight 
with the Produce and TxnOffsetCommit requests.
+         * Plus the behaviors in genericErrorSupported.
+         */
+        ADD_PARTITION(true);
+
+        public final boolean supportsEpochBump;
+
+        TransactionSupportedOperation(boolean supportsEpochBump) {
+            this.supportsEpochBump = supportsEpochBump;
+        }
+    }
+
+    @FunctionalInterface
+    public interface AppendCallback {
+        void complete(Map<TopicPartition, Errors> partitionErrors);
+    }
+
+    public static TransactionSupportedOperation 
produceRequestVersionToTransactionSupportedOperation(short version) {
+        if (version > 11) {
+            return ADD_PARTITION;
+        } else if (version > 10) {
+            return GENERIC_ERROR_SUPPORTED;
+        } else {
+            return DEFAULT_ERROR;
+        }
+    }
+
+    public static TransactionSupportedOperation 
txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
+        if (version > 4) {
+            return ADD_PARTITION;
+        } else if (version > 3) {
+            return GENERIC_ERROR_SUPPORTED;
+        } else {
+            return DEFAULT_ERROR;
+        }
+    }
+
+    /*
+     * Data structure to hold the transactional data to send to a node. Note 
-- at most one request per transactional ID
+     * will exist at a time in the map. If a given transactional ID exists in 
the map, and a new request with the same ID
+     * comes in, one request will be in the map and one will return to the 
producer with a response depending on the epoch.
+     */
+    public record TransactionDataAndCallbacks(
+            AddPartitionsToTxnTransactionCollection transactionData,
+            Map<String, AppendCallback> callbacks,
+            Map<String, Long> startTimeMs,
+            TransactionSupportedOperation transactionSupportedOperation) { }
+
+    private class AddPartitionsToTxnHandler implements 
RequestCompletionHandler {
+        private final Node node;
+        private final TransactionDataAndCallbacks transactionDataAndCallbacks;
+
+        public AddPartitionsToTxnHandler(Node node, 
TransactionDataAndCallbacks transactionDataAndCallbacks) {
+            this.node = node;
+            this.transactionDataAndCallbacks = transactionDataAndCallbacks;
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            // Note: Synchronization is not needed on inflightNodes since it 
is always accessed from this thread.
+            inflightNodes.remove(node);
+            if (response.authenticationException() != null) {
+                log.error("AddPartitionsToTxnRequest failed for node {} with 
an authentication exception.", response.destination(), 
response.authenticationException());
+                
sendCallbacksToAll(Errors.forException(response.authenticationException()).code());
+            } else if (response.versionMismatch() != null) {
+                // We may see unsupported version exception if we try to send 
a verify only request to a broker that can't handle it.
+                // In this case, skip verification.
+                log.warn("AddPartitionsToTxnRequest failed for node {} with 
invalid version exception. " +
+                        "This suggests verification is not supported. 
Continuing handling the produce request.", response.destination());
+                transactionDataAndCallbacks.callbacks().forEach((txnId, 
callback) ->
+                        sendCallback(callback, Map.of(), 
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+            } else if (response.wasDisconnected() || response.wasTimedOut()) {
+                log.warn("AddPartitionsToTxnRequest failed for node {} with a 
network exception.", response.destination());
+                sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code());
+            } else {
+                AddPartitionsToTxnResponseData responseData = 
((AddPartitionsToTxnResponse) response.responseBody()).data();
+                if (responseData.errorCode() != 0) {
+                    log.error("AddPartitionsToTxnRequest for node {} returned 
with error {}.",
+                            response.destination(), 
Errors.forCode(responseData.errorCode()));
+                    // The client should not be exposed to 
CLUSTER_AUTHORIZATION_FAILED so modify the error to signify the verification 
did not complete.
+                    // Return INVALID_TXN_STATE.
+                    short finalError = responseData.errorCode() == 
Errors.CLUSTER_AUTHORIZATION_FAILED.code()
+                            ? Errors.INVALID_TXN_STATE.code() : 
responseData.errorCode();
+                    sendCallbacksToAll(finalError);
+                } else {
+                    for 
(AddPartitionsToTxnResponseData.AddPartitionsToTxnResult txnResult : 
responseData.resultsByTransaction()) {
+                        Map<TopicPartition, Errors> unverified = new 
HashMap<>();
+                        for 
(AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult topicResult : 
txnResult.topicResults()) {
+                            for 
(AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult 
partitionResult : topicResult.resultsByPartition()) {
+                                TopicPartition tp = new 
TopicPartition(topicResult.name(), partitionResult.partitionIndex());
+                                if (partitionResult.partitionErrorCode() != 
Errors.NONE.code()) {
+                                    // Producers expect to handle 
INVALID_PRODUCER_EPOCH in this scenario.
+                                    short code;
+                                    if (partitionResult.partitionErrorCode() 
== Errors.PRODUCER_FENCED.code()) {
+                                        code = 
Errors.INVALID_PRODUCER_EPOCH.code();
+                                    } else if 
(partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code()
+                                            && 
transactionDataAndCallbacks.transactionSupportedOperation().equals(DEFAULT_ERROR))
 { // For backward compatibility with clients
+                                        code = Errors.INVALID_TXN_STATE.code();
+                                    } else {
+                                        code = 
partitionResult.partitionErrorCode();
+                                    }
+                                    unverified.put(tp, Errors.forCode(code));
+                                }
+                            }
+                        }
+                        verificationFailureRate.mark(unverified.size());
+                        AppendCallback callback = 
transactionDataAndCallbacks.callbacks().get(txnResult.transactionalId());
+                        sendCallback(callback, unverified, 
transactionDataAndCallbacks.startTimeMs.get(txnResult.transactionalId()));
+                    }
+                }
+            }
+            wakeup();
+        }
+
+        private Map<TopicPartition, Errors> buildErrorMap(String 
transactionalId, short errorCode) {
+            AddPartitionsToTxnTransaction transactionData = 
transactionDataAndCallbacks.transactionData.find(transactionalId);
+            return topicPartitionsToError(transactionData, 
Errors.forCode(errorCode));
+        }
+
+        private void sendCallbacksToAll(short errorCode) {
+            transactionDataAndCallbacks.callbacks.forEach((txnId, cb) ->
+                    sendCallback(cb, buildErrorMap(txnId, errorCode), 
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+        }
+    }
+
+    private final MetadataCache metadataCache;
+    private final Function<String, Integer> partitionFor;
+    private final Time time;
+
+    private final ListenerName interBrokerListenerName;
+    private final Set<Node> inflightNodes = new HashSet<>();
+    private final Map<Node, TransactionDataAndCallbacks> nodesToTransactions = 
new HashMap<>();
+
+    // For compatibility - this metrics group was previously defined within
+    // a Scala class named `kafka.server.AddPartitionsToTxnManager`
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.server", "AddPartitionsToTxnManager");
+    private final Meter verificationFailureRate = 
metricsGroup.newMeter(VERIFICATION_FAILURE_RATE_METRIC_NAME, "failures", 
TimeUnit.SECONDS);
+    private final Histogram verificationTimeMs = 
metricsGroup.newHistogram(VERIFICATION_TIME_MS_METRIC_NAME);
+
+    public AddPartitionsToTxnManager(
+            AbstractKafkaConfig config,
+            NetworkClient client,
+            MetadataCache metadataCache,
+            Function<String, Integer> partitionFor,
+            Time time) {
+        super("AddPartitionsToTxnSenderThread-" + config.brokerId(), client, 
config.requestTimeoutMs(), time);
+        this.interBrokerListenerName = config.interBrokerListenerName();
+        this.metadataCache = metadataCache;
+        this.partitionFor = partitionFor;
+        this.time = time;
+    }
+
+    public void addOrVerifyTransaction(
+            String transactionalId,
+            long producerId,
+            short producerEpoch,
+            Collection<TopicPartition> topicPartitions,
+            AppendCallback callback,
+            TransactionSupportedOperation transactionSupportedOperation) {
+        Optional<Node> coordinator = 
getTransactionCoordinator(partitionFor.apply(transactionalId));
+        if (coordinator.isEmpty()) {
+            callback.complete(topicPartitions.stream().collect(
+                    Collectors.toMap(Function.identity(), tp -> 
Errors.COORDINATOR_NOT_AVAILABLE)));
+        } else {
+            AddPartitionsToTxnTopicCollection topicCollection = new 
AddPartitionsToTxnTopicCollection();
+            
topicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
 tps) -> {
+                topicCollection.add(new AddPartitionsToTxnTopic()
+                        .setName(topic)
+                        
.setPartitions(tps.stream().map(TopicPartition::partition).collect(Collectors.toList())));
+            });
+
+            AddPartitionsToTxnTransaction transactionData = new 
AddPartitionsToTxnTransaction()
+                    .setTransactionalId(transactionalId)
+                    .setProducerId(producerId)
+                    .setProducerEpoch(producerEpoch)
+                    
.setVerifyOnly(!transactionSupportedOperation.supportsEpochBump)
+                    .setTopics(topicCollection);
+
+            addTxnData(coordinator.get(), transactionData, callback, 
transactionSupportedOperation);
+        }
+    }
+
+    private void addTxnData(
+            Node node,
+            AddPartitionsToTxnTransaction transactionData,
+            AppendCallback callback,
+            TransactionSupportedOperation transactionSupportedOperation) {
+        synchronized (nodesToTransactions) {
+            long curTime = time.milliseconds();
+            // Check if we have already had either node or individual 
transaction. Add the Node if it isn't there.
+            TransactionDataAndCallbacks existingNodeAndTransactionData = 
nodesToTransactions.computeIfAbsent(node,
+                    ignored -> new TransactionDataAndCallbacks(
+                            new AddPartitionsToTxnTransactionCollection(1),
+                            new HashMap<>(),
+                            new HashMap<>(),
+                            transactionSupportedOperation));
+
+            AddPartitionsToTxnTransaction existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId());
+
+            // There are 3 cases if we already have existing data
+            // 1. Incoming data has a higher epoch -- return 
INVALID_PRODUCER_EPOCH for existing data since it is fenced
+            // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION 
for existing data, since the client is likely retrying and we want another 
retriable exception
+            // 3. Incoming data has a lower epoch -- return 
INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add 
incoming data to verify
+            if (existingTransactionData != null) {
+                if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+                    Errors error = (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+                            ? Errors.INVALID_PRODUCER_EPOCH : 
Errors.NETWORK_EXCEPTION;
+                    AppendCallback oldCallback = 
existingNodeAndTransactionData.callbacks.get(transactionData.transactionalId());
+                    
existingNodeAndTransactionData.transactionData.remove(transactionData);
+                    sendCallback(oldCallback, 
topicPartitionsToError(existingTransactionData, error), 
existingNodeAndTransactionData.startTimeMs.get(transactionData.transactionalId()));
+                } else {
+                    // If the incoming transactionData's epoch is lower, we 
can return with INVALID_PRODUCER_EPOCH immediately.
+                    sendCallback(callback, 
topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH), 
curTime);
+                    return;
+                }
+            }
+
+            
existingNodeAndTransactionData.transactionData.add(transactionData);
+            
existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback);
+            
existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId(),
 curTime);
+            wakeup();
+        }
+    }
+
+    private Optional<Node> getTransactionCoordinator(int partition) {
+        return 
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
+                .filter(leaderAndIsr -> leaderAndIsr.leader() != 
MetadataResponse.NO_LEADER_ID)
+                .flatMap(metadata -> 
metadataCache.getAliveBrokerNode(metadata.leader(), interBrokerListenerName));
+    }
+
+    private Map<TopicPartition, Errors> 
topicPartitionsToError(AddPartitionsToTxnTransaction txnData, Errors error) {
+        Map<TopicPartition, Errors> topicPartitionsToError = new HashMap<>();
+        txnData.topics().forEach(topic ->
+            topic.partitions().forEach(partition ->
+                topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)));
+        verificationFailureRate.mark(topicPartitionsToError.size());
+        return topicPartitionsToError;
+    }
+
+    private void sendCallback(AppendCallback callback, Map<TopicPartition, 
Errors> errors, long startTimeMs) {
+        verificationTimeMs.update(time.milliseconds() - startTimeMs);
+        callback.complete(errors);
+    }
+
+    @Override
+    public Collection<RequestAndCompletionHandler> generateRequests() {
+        // build and add requests to the queue
+        List<RequestAndCompletionHandler> list = new ArrayList<>();
+        var currentTimeMs = time.milliseconds();
+        synchronized (nodesToTransactions) {
+            var iter = nodesToTransactions.entrySet().iterator();
+            while (iter.hasNext()) {
+                var entry = iter.next();
+                var node = entry.getKey();
+                var transactionDataAndCallbacks = entry.getValue();
+                if (!inflightNodes.contains(node)) {
+                    list.add(new RequestAndCompletionHandler(
+                            currentTimeMs,
+                            node,
+                            
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData()),
+                            new AddPartitionsToTxnHandler(node, 
transactionDataAndCallbacks)
+                    ));
+                    inflightNodes.add(node);
+                    iter.remove();
+                }
+            }
+        }
+        return list;
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        super.shutdown();
+        metricsGroup.removeMetric(VERIFICATION_FAILURE_RATE_METRIC_NAME);
+        metricsGroup.removeMetric(VERIFICATION_TIME_MS_METRIC_NAME);
+    }
+}
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index 70ee6d2d624..011b0978b17 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -137,7 +137,7 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
                     .next()
                     .config()
                     .controllerListenerNames()
-                    .head()
+                    .get(0)
             );
         }
 

Reply via email to