This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 83fb40d7435 KAFKA-14895 [1/N] Move AddPartitionsToTxnManager files to
java (#19879)
83fb40d7435 is described below
commit 83fb40d74350664a051366545f3098950bfd5b67
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Jun 8 00:16:55 2025 +0800
KAFKA-14895 [1/N] Move AddPartitionsToTxnManager files to java (#19879)
Move AddPartitionsToTxnManager to server module and convert to Java.
This patch moves AddPartitionsToTxnManager from the core module to the
server module, with its package updated from `kafka.server` to
`org.apache.kafka.server.transaction`. Additionally, several
configuration used by AddPartitionsToTxnManager are moved from
KafkaConfig.scala to AbstractKafkaConfig.java.
- brokerId
- requestTimeoutMs
- controllerListenerNames
- interBrokerListenerName
- interBrokerSecurityProtocol
- effectiveListenerSecurityProtocolMap
The next PR will move AddPartitionsToTxnManagerTest.scala to java
Reviewers: Justine Olshan <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../group/CoordinatorPartitionWriter.scala | 3 +-
core/src/main/scala/kafka/raft/RaftManager.scala | 8 +-
.../kafka/server/AddPartitionsToTxnManager.scala | 315 ------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 1 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 98 +-----
.../server/NodeToControllerChannelManager.scala | 5 +-
.../main/scala/kafka/server/ReplicaManager.scala | 18 +-
core/src/main/scala/kafka/tools/StorageTool.scala | 2 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 7 +-
.../kafka/server/QuorumTestHarness.scala | 2 +-
.../AbstractCoordinatorConcurrencyTest.scala | 1 +
.../server/AddPartitionsToTxnManagerTest.scala | 67 ++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 30 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 59 ++--
.../kafka/server/config/AbstractKafkaConfig.java | 115 +++++++
.../transaction/AddPartitionsToTxnManager.java | 357 +++++++++++++++++++++
.../test/junit/RaftClusterInvocationContext.java | 2 +-
19 files changed, 591 insertions(+), 504 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 62c5bc2d6d9..dbbdbb09868 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -17,13 +17,14 @@
package kafka.coordinator.group
import kafka.cluster.PartitionListener
-import kafka.server.{AddPartitionsToTxnManager, ReplicaManager}
+import kafka.server.ReplicaManager
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
import java.util.concurrent.CompletableFuture
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index d7775d88dbe..9e8ea38f8fd 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -236,11 +236,9 @@ class KafkaRaftManager[T](
}
private def buildNetworkClient(): (ListenerName, NetworkClient) = {
- val controllerListenerName = new
ListenerName(config.controllerListenerNames.head)
- val controllerSecurityProtocol =
config.effectiveListenerSecurityProtocolMap.getOrElse(
- controllerListenerName,
- SecurityProtocol.forName(controllerListenerName.value())
- )
+ val controllerListenerName = new
ListenerName(config.controllerListenerNames.get(0))
+ val controllerSecurityProtocol =
Option(config.effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+ .getOrElse(SecurityProtocol.forName(controllerListenerName.value()))
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerSecurityProtocol,
JaasContext.Type.SERVER,
diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
deleted file mode 100644
index b7e3bd36d84..00000000000
--- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import
kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName,
VerificationTimeMsMetricName}
-import kafka.utils.Logging
-import org.apache.kafka.clients.{ClientResponse, NetworkClient,
RequestCompletionHandler}
-import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.{Node, TopicPartition}
-import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction,
AddPartitionsToTxnTransactionCollection}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest,
AddPartitionsToTxnResponse, MetadataResponse}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.util.{InterBrokerSendThread,
RequestAndCompletionHandler}
-
-import java.util
-import java.util.concurrent.TimeUnit
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-object AddPartitionsToTxnManager {
- type AppendCallback = Map[TopicPartition, Errors] => Unit
-
- val VerificationFailureRateMetricName = "VerificationFailureRate"
- val VerificationTimeMsMetricName = "VerificationTimeMs"
-
- def produceRequestVersionToTransactionSupportedOperation(version: Short):
TransactionSupportedOperation = {
- if (version > 11) {
- addPartition
- } else if (version > 10) {
- genericErrorSupported
- } else {
- defaultError
- }
- }
-
- def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version:
Int): TransactionSupportedOperation = {
- if (version > 4) {
- addPartition
- } else if (version > 3) {
- genericErrorSupported
- } else {
- defaultError
- }
- }
-}
-
-/**
- * This is an enum which handles the Partition Response based on the Request
Version and the exact operation
- * defaultError: This is the default workflow which maps to cases
when the Produce Request Version or the Txn_offset_commit request was lower
than the first version supporting the new Error Class
- * genericErrorSupported: This maps to the case when the clients are
updated to handle the TransactionAbortableException
- * addPartition: This allows the partition to be added to the
transactions inflight with the Produce and TxnOffsetCommit requests. Plus the
behaviors in genericErrorSupported.
- */
-sealed trait TransactionSupportedOperation {
- val supportsEpochBump = false;
-}
-case object defaultError extends TransactionSupportedOperation
-case object genericErrorSupported extends TransactionSupportedOperation
-case object addPartition extends TransactionSupportedOperation {
- override val supportsEpochBump = true
-}
-
-/*
- * Data structure to hold the transactional data to send to a node. Note -- at
most one request per transactional ID
- * will exist at a time in the map. If a given transactional ID exists in the
map, and a new request with the same ID
- * comes in, one request will be in the map and one will return to the
producer with a response depending on the epoch.
- */
-class TransactionDataAndCallbacks(val transactionData:
AddPartitionsToTxnTransactionCollection,
- val callbacks: mutable.Map[String,
AddPartitionsToTxnManager.AppendCallback],
- val startTimeMs: mutable.Map[String, Long],
- val transactionSupportedOperation:
TransactionSupportedOperation)
-
-class AddPartitionsToTxnManager(
- config: KafkaConfig,
- client: NetworkClient,
- metadataCache: MetadataCache,
- partitionFor: String => Int,
- time: Time
-) extends InterBrokerSendThread(
- "AddPartitionsToTxnSenderThread-" + config.brokerId,
- client,
- config.requestTimeoutMs,
- time
-) with Logging {
-
- this.logIdent = logPrefix
-
- private val interBrokerListenerName = config.interBrokerListenerName
- private val inflightNodes = mutable.HashSet[Node]()
- private val nodesToTransactions = mutable.Map[Node,
TransactionDataAndCallbacks]()
-
- private val metricsGroup = new KafkaMetricsGroup(this.getClass)
- private val verificationFailureRate =
metricsGroup.newMeter(VerificationFailureRateMetricName, "failures",
TimeUnit.SECONDS)
- private val verificationTimeMs =
metricsGroup.newHistogram(VerificationTimeMsMetricName)
-
- def addOrVerifyTransaction(
- transactionalId: String,
- producerId: Long,
- producerEpoch: Short,
- topicPartitions: Seq[TopicPartition],
- callback: AddPartitionsToTxnManager.AppendCallback,
- transactionSupportedOperation: TransactionSupportedOperation
- ): Unit = {
- val coordinatorNode =
getTransactionCoordinator(partitionFor(transactionalId))
- if (coordinatorNode.isEmpty) {
- callback(topicPartitions.map(tp => tp ->
Errors.COORDINATOR_NOT_AVAILABLE).toMap)
- } else {
- val topicCollection = new AddPartitionsToTxnTopicCollection()
- topicPartitions.groupBy(_.topic).foreachEntry { (topic, tps) =>
- topicCollection.add(new AddPartitionsToTxnTopic()
- .setName(topic)
- .setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava))
- }
-
- val transactionData = new AddPartitionsToTxnTransaction()
- .setTransactionalId(transactionalId)
- .setProducerId(producerId)
- .setProducerEpoch(producerEpoch)
- .setVerifyOnly(!transactionSupportedOperation.supportsEpochBump)
- .setTopics(topicCollection)
-
- addTxnData(coordinatorNode.get, transactionData, callback,
transactionSupportedOperation)
-
- }
- }
-
- private def addTxnData(
- node: Node,
- transactionData: AddPartitionsToTxnTransaction,
- callback: AddPartitionsToTxnManager.AppendCallback,
- transactionSupportedOperation: TransactionSupportedOperation
- ): Unit = {
- nodesToTransactions.synchronized {
- val curTime = time.milliseconds()
- // Check if we have already have either node or individual transaction.
Add the Node if it isn't there.
- val existingNodeAndTransactionData =
nodesToTransactions.getOrElseUpdate(node,
- new TransactionDataAndCallbacks(
- new AddPartitionsToTxnTransactionCollection(1),
- mutable.Map[String, AddPartitionsToTxnManager.AppendCallback](),
- mutable.Map[String, Long](),
- transactionSupportedOperation))
-
- val existingTransactionData =
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
-
- // There are 3 cases if we already have existing data
- // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH
for existing data since it is fenced
- // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for
existing data, since the client is likely retrying and we want another
retriable exception
- // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH
for the incoming data since it is fenced, do not add incoming data to verify
- if (existingTransactionData != null) {
- if (existingTransactionData.producerEpoch <=
transactionData.producerEpoch) {
- val error = if (existingTransactionData.producerEpoch <
transactionData.producerEpoch)
- Errors.INVALID_PRODUCER_EPOCH
- else
- Errors.NETWORK_EXCEPTION
- val oldCallback =
existingNodeAndTransactionData.callbacks(transactionData.transactionalId)
-
existingNodeAndTransactionData.transactionData.remove(transactionData)
- sendCallback(oldCallback,
topicPartitionsToError(existingTransactionData, error),
existingNodeAndTransactionData.startTimeMs(transactionData.transactionalId))
- } else {
- // If the incoming transactionData's epoch is lower, we can return
with INVALID_PRODUCER_EPOCH immediately.
- sendCallback(callback, topicPartitionsToError(transactionData,
Errors.INVALID_PRODUCER_EPOCH), curTime)
- return
- }
- }
-
- existingNodeAndTransactionData.transactionData.add(transactionData)
-
existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId,
callback)
-
existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId,
curTime)
- wakeup()
- }
- }
-
- private def getTransactionCoordinator(partition: Int): util.Optional[Node] =
{
- metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
- .filter(_.leader != MetadataResponse.NO_LEADER_ID)
- .flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader,
interBrokerListenerName))
- }
-
- private def topicPartitionsToError(transactionData:
AddPartitionsToTxnTransaction, error: Errors): Map[TopicPartition, Errors] = {
- val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
- transactionData.topics.forEach { topic =>
- topic.partitions.forEach { partition =>
- topicPartitionsToError.put(new TopicPartition(topic.name, partition),
error)
- }
- }
- verificationFailureRate.mark(topicPartitionsToError.size)
- topicPartitionsToError.toMap
- }
-
- private def sendCallback(callback: AddPartitionsToTxnManager.AppendCallback,
errorMap: Map[TopicPartition, Errors], startTimeMs: Long): Unit = {
- verificationTimeMs.update(time.milliseconds() - startTimeMs)
- callback(errorMap)
- }
-
- private class AddPartitionsToTxnHandler(node: Node,
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends
RequestCompletionHandler {
- override def onComplete(response: ClientResponse): Unit = {
- // Note: Synchronization is not needed on inflightNodes since it is
always accessed from this thread.
- inflightNodes.remove(node)
- if (response.authenticationException != null) {
- error(s"AddPartitionsToTxnRequest failed for node
${response.destination} with an " +
- "authentication exception.", response.authenticationException)
-
sendCallbacksToAll(Errors.forException(response.authenticationException).code)
- } else if (response.versionMismatch != null) {
- // We may see unsupported version exception if we try to send a verify
only request to a broker that can't handle it.
- // In this case, skip verification.
- warn(s"AddPartitionsToTxnRequest failed for node
${response.destination} with invalid version exception. This suggests
verification is not supported." +
- s"Continuing handling the produce request.")
- transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback)
=>
- sendCallback(callback, Map.empty,
transactionDataAndCallbacks.startTimeMs(txnId))
- }
- } else if (response.wasDisconnected || response.wasTimedOut) {
- warn(s"AddPartitionsToTxnRequest failed for node
${response.destination} with a network exception.")
- sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code)
- } else {
- val addPartitionsToTxnResponseData =
response.responseBody.asInstanceOf[AddPartitionsToTxnResponse].data
- if (addPartitionsToTxnResponseData.errorCode != 0) {
- error(s"AddPartitionsToTxnRequest for node ${response.destination}
returned with error
${Errors.forCode(addPartitionsToTxnResponseData.errorCode)}.")
- // The client should not be exposed to CLUSTER_AUTHORIZATION_FAILED
so modify the error to signify the verification did not complete.
- // Return INVALID_TXN_STATE.
- val finalError = if (addPartitionsToTxnResponseData.errorCode ==
Errors.CLUSTER_AUTHORIZATION_FAILED.code)
- Errors.INVALID_TXN_STATE.code
- else
- addPartitionsToTxnResponseData.errorCode
-
- sendCallbacksToAll(finalError)
- } else {
- addPartitionsToTxnResponseData.resultsByTransaction.forEach {
transactionResult =>
- val unverified = mutable.Map[TopicPartition, Errors]()
- transactionResult.topicResults.forEach { topicResult =>
- topicResult.resultsByPartition.forEach { partitionResult =>
- val tp = new TopicPartition(topicResult.name,
partitionResult.partitionIndex)
- if (partitionResult.partitionErrorCode != Errors.NONE.code) {
- // Producers expect to handle INVALID_PRODUCER_EPOCH in this
scenario.
- val code =
- if (partitionResult.partitionErrorCode ==
Errors.PRODUCER_FENCED.code)
- Errors.INVALID_PRODUCER_EPOCH.code
- else if (partitionResult.partitionErrorCode() ==
Errors.TRANSACTION_ABORTABLE.code
- &&
transactionDataAndCallbacks.transactionSupportedOperation == defaultError) //
For backward compatibility with clients.
- Errors.INVALID_TXN_STATE.code
- else
- partitionResult.partitionErrorCode
- unverified.put(tp, Errors.forCode(code))
- }
- }
- }
- verificationFailureRate.mark(unverified.size)
- val callback =
transactionDataAndCallbacks.callbacks(transactionResult.transactionalId)
- sendCallback(callback, unverified.toMap,
transactionDataAndCallbacks.startTimeMs(transactionResult.transactionalId))
- }
- }
- }
- wakeup()
- }
-
- private def buildErrorMap(transactionalId: String, errorCode: Short):
Map[TopicPartition, Errors] = {
- val transactionData =
transactionDataAndCallbacks.transactionData.find(transactionalId)
- topicPartitionsToError(transactionData, Errors.forCode(errorCode))
- }
-
- private def sendCallbacksToAll(errorCode: Short): Unit = {
- transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) =>
- sendCallback(callback, buildErrorMap(txnId, errorCode),
transactionDataAndCallbacks.startTimeMs(txnId))
- }
- }
- }
-
- override def generateRequests():
util.Collection[RequestAndCompletionHandler] = {
- // build and add requests to queue
- val list = new util.ArrayList[RequestAndCompletionHandler]()
- val currentTimeMs = time.milliseconds()
- val removedNodes = mutable.Set[Node]()
- nodesToTransactions.synchronized {
- nodesToTransactions.foreach { case (node, transactionDataAndCallbacks) =>
- if (!inflightNodes.contains(node)) {
- list.add(new RequestAndCompletionHandler(
- currentTimeMs,
- node,
-
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData),
- new AddPartitionsToTxnHandler(node, transactionDataAndCallbacks)
- ))
-
- removedNodes.add(node)
- }
- }
- removedNodes.foreach { node =>
- inflightNodes.add(node)
- nodesToTransactions.remove(node)
- }
- }
- list
- }
-
- override def shutdown(): Unit = {
- super.shutdown()
- metricsGroup.removeMetric(VerificationFailureRateMetricName)
- metricsGroup.removeMetric(VerificationTimeMsMetricName)
- }
-
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 22749f1f4af..22c6847ded0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -55,6 +55,7 @@ import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue,
DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 94131c65d8c..bdd9c94a319 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -973,7 +973,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
val oldListeners = oldConfig.listeners.map(l =>
ListenerName.normalised(l.listener)).toSet
if (!oldAdvertisedListeners.subsetOf(newListeners))
throw new ConfigException(s"Advertised listeners
'$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
- if
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
+ if
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet.asScala))
throw new ConfigException(s"Listeners '$newListeners' must be subset of
listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
newListeners.intersect(oldListeners).foreach { listenerName =>
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String):
Map[String, AnyRef] = {
@@ -985,7 +985,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
if (immutableListenerConfigs(newConfig, listenerName.configPrefix) !=
immutableListenerConfigs(oldConfig, listenerName.configPrefix))
throw new ConfigException(s"Configs cannot be updated dynamically for
existing listener $listenerName, " +
"restart broker or create a new listener for update")
- if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) !=
newConfig.effectiveListenerSecurityProtocolMap(listenerName))
+ if (oldConfig.effectiveListenerSecurityProtocolMap.get(listenerName) !=
newConfig.effectiveListenerSecurityProtocolMap.get(listenerName))
throw new ConfigException(s"Security protocol cannot be updated for
existing listener $listenerName")
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 595ef5dc2c5..3ca43aa145b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -68,6 +68,7 @@ import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.AppendOrigin
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 487e0b1fac4..67c6febe1ca 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import java.util.Properties
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
-import org.apache.kafka.common.{Endpoint, KafkaException, Reconfigurable}
+import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -44,10 +44,10 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs,
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import scala.jdk.CollectionConverters._
@@ -144,14 +144,6 @@ object KafkaConfig {
}
output
}
-
- private def parseListenerName(connectionString: String): String = {
- val firstColon = connectionString.indexOf(':')
- if (firstColon < 0) {
- throw new KafkaException(s"Unable to parse a listener name from
$connectionString")
- }
- connectionString.substring(0, firstColon).toUpperCase(util.Locale.ROOT)
- }
}
/**
@@ -216,7 +208,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def quotaConfig: QuotaConfig = _quotaConfig
/** ********* General Configuration ***********/
- var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG)
val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
val initialRegistrationTimeoutMs: Int =
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
val brokerHeartbeatIntervalMs: Int =
getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
@@ -255,7 +246,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val serverMaxStartupTimeMs =
getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
- val requestTimeoutMs = getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMs =
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMaxMs =
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
@@ -306,7 +296,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val socketListenBacklogSize =
getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
def maxConnectionsPerIp =
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
def maxConnectionsPerIpOverrides: Map[String, Int] =
- getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).map {
case (k, v) => (k, v.toInt)}
+ getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).asScala.map
{ case (k, v) => (k, v.toInt)}
def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
def maxConnectionCreationRate =
getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
val connectionsMaxIdleMs =
getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG)
@@ -408,8 +398,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
Set.empty[String]
}
- def interBrokerListenerName =
getInterBrokerListenerNameAndSecurityProtocol._1
- def interBrokerSecurityProtocol =
getInterBrokerListenerNameAndSecurityProtocol._2
def saslMechanismInterBrokerProtocol =
getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
/** ********* Fetch Configuration **************/
@@ -453,26 +441,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
millis
}
- private def getMap(propName: String, propValue: String): Map[String, String]
= {
- try {
- Csv.parseCsvMap(propValue).asScala
- } catch {
- case e: Exception => throw new IllegalArgumentException("Error parsing
configuration property '%s': %s".format(propName, e.getMessage))
- }
- }
-
def listeners: Seq[Endpoint] =
CoreUtils.listenerListToEndPoints(getString(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap)
- def controllerListenerNames: Seq[String] = {
- val value =
Option(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG)).getOrElse("")
- if (value.isEmpty) {
- Seq.empty
- } else {
- value.split(",")
- }
- }
-
def controllerListeners: Seq[Endpoint] =
listeners.filter(l => controllerListenerNames.contains(l.listener))
@@ -495,7 +466,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
val controllerListenersValue = controllerListeners
- controllerListenerNames.flatMap { name =>
+ controllerListenerNames.asScala.flatMap { name =>
controllerAdvertisedListeners
.find(endpoint =>
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
.orElse(
@@ -526,57 +497,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
advertisedListeners.filterNot(l =>
controllerListenerNames.contains(l.listener))
}
- private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName,
SecurityProtocol) = {
- Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG))
match {
- case Some(_) if
originals.containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG)
=>
- throw new ConfigException(s"Only one of
${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} and " +
- s"${ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG} should
be set.")
- case Some(name) =>
- val listenerName = ListenerName.normalised(name)
- val securityProtocol =
effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
- throw new ConfigException(s"Listener with name ${listenerName.value}
defined in " +
- s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} not
found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}."))
- (listenerName, securityProtocol)
- case None =>
- val securityProtocol =
getSecurityProtocol(getString(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG),
- ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG)
- (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
- }
- }
-
- private def getSecurityProtocol(protocolName: String, configName: String):
SecurityProtocol = {
- try SecurityProtocol.forName(protocolName)
- catch {
- case _: IllegalArgumentException =>
- throw new ConfigException(s"Invalid security protocol `$protocolName`
defined in $configName")
- }
- }
-
- def effectiveListenerSecurityProtocolMap: Map[ListenerName,
SecurityProtocol] = {
- val mapValue =
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
- .map { case (listenerName, protocolName) =>
- ListenerName.normalised(listenerName) ->
getSecurityProtocol(protocolName,
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
- }
- if
(!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
{
- // Nothing was specified explicitly for listener.security.protocol.map,
so we are using the default value,
- // and we are using KRaft.
- // Add PLAINTEXT mappings for controller listeners as long as there is
no SSL or SASL_{PLAINTEXT,SSL} in use
- def isSslOrSasl(name: String): Boolean =
name.equals(SecurityProtocol.SSL.name) ||
name.equals(SecurityProtocol.SASL_SSL.name) ||
name.equals(SecurityProtocol.SASL_PLAINTEXT.name)
- // check controller listener names (they won't appear in listeners when
process.roles=broker)
- // as well as listeners for occurrences of SSL or SASL_*
- if (controllerListenerNames.exists(isSslOrSasl) ||
-
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue
=> isSslOrSasl(KafkaConfig.parseListenerName(listenerValue)))) {
- mapValue // don't add default mappings since we found something that
is SSL or SASL_*
- } else {
- // add the PLAINTEXT mappings for all controller listener names that
are not explicitly PLAINTEXT
- mapValue ++
controllerListenerNames.filterNot(SecurityProtocol.PLAINTEXT.name.equals(_)).map(
- new ListenerName(_) -> SecurityProtocol.PLAINTEXT)
- }
- } else {
- mapValue
- }
- }
-
validateValues()
private def validateValues(): Unit = {
@@ -617,7 +537,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
def
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit
= {
val listenerNameValues = listeners.map(_.listener).toSet
- require(controllerListenerNames.forall(cln =>
listenerNameValues.contains(cln)),
+ require(controllerListenerNames.stream().allMatch(cln =>
listenerNameValues.contains(cln)),
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration
when running the KRaft controller role")
}
def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
@@ -637,22 +557,22 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
require(!voterIds.contains(nodeId),
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker'
role, the node id $nodeId must not be included in the set of voters
${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
// controller.listener.names must be non-empty...
- require(controllerListenerNames.nonEmpty,
+ require(controllerListenerNames.size() > 0,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at
least one value when running KRaft with just the broker role")
// controller.listener.names are forbidden in listeners...
require(controllerListeners.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain a
value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration
when running KRaft with just the broker role")
// controller.listener.names must all appear in
listener.security.protocol.map
- controllerListenerNames.foreach { name =>
+ controllerListenerNames.forEach { name =>
val listenerName = ListenerName.normalised(name)
- if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
+ if (!effectiveListenerSecurityProtocolMap.containsKey(listenerName)) {
throw new ConfigException(s"Controller listener with name
${listenerName.value} defined in " +
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} (an explicit
security mapping for each controller listener is required if
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or
if there are security protocols other than PLAINTEXT in use)")
}
}
// warn that only the first controller listener is used if there is more
than one
if (controllerListenerNames.size > 1) {
- warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple
entries; only the first will be used since
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
+ warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple
entries; only the first will be used since
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames}")
}
// warn if create.topic.policy.class.name or
alter.config.policy.class.name is defined in the broker role
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole,
ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
diff --git
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index c353a825503..cd6b8e1d134 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -55,8 +55,9 @@ object RaftControllerNodeProvider {
raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
): RaftControllerNodeProvider = {
- val controllerListenerName = new
ListenerName(config.controllerListenerNames.head)
- val controllerSecurityProtocol =
config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName,
SecurityProtocol.forName(controllerListenerName.value()))
+ val controllerListenerName = new
ListenerName(config.controllerListenerNames.get(0))
+ val controllerSecurityProtocol =
Option(config.effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+ .getOrElse(SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
raftManager,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32f51acf77f..7c1b13b798c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -59,6 +59,8 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets,
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus,
TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey,
DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
+import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue,
LogReadResult, common}
@@ -1054,18 +1056,18 @@ class ReplicaManager(val config: KafkaConfig,
}
def invokeCallback(
- verificationErrors: Map[TopicPartition, Errors]
+ verificationErrors: java.util.Map[TopicPartition, Errors]
): Unit = {
- callback((errors ++ verificationErrors, verificationGuards.toMap))
+ callback((errors ++ verificationErrors.asScala,
verificationGuards.toMap))
}
addPartitionsToTxnManager.foreach(_.addOrVerifyTransaction(
- transactionalId = transactionalId,
- producerId = producerId,
- producerEpoch = producerEpoch,
- topicPartitions = verificationGuards.keys.toSeq,
- callback = invokeCallback,
- transactionSupportedOperation = transactionSupportedOperation
+ transactionalId,
+ producerId,
+ producerEpoch,
+ verificationGuards.keys.toSeq.asJava,
+ invokeCallback,
+ transactionSupportedOperation
))
}
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 08a29b3d01d..c7b4f28e336 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -126,7 +126,7 @@ object StorageTool extends Logging {
setClusterId(namespace.getString("cluster_id")).
setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
- setControllerListenerName(config.controllerListenerNames.head).
+ setControllerListenerName(config.controllerListenerNames.get(0)).
setMetadataLogDirectory(config.metadataLogDir)
Option(namespace.getString("release_version")).foreach(
releaseVersion => formatter.
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 8af81d97a49..31a8aad7c80 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -23,7 +23,6 @@ import java.lang.management.ManagementFactory
import com.typesafe.scalalogging.Logger
import javax.management.ObjectName
-import scala.collection._
import scala.collection.Seq
import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.Endpoint
@@ -122,7 +121,7 @@ object CoreUtils {
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T =
inLock[T](lock.writeLock)(fun)
- def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
+ def listenerListToEndPoints(listeners: String, securityProtocolMap:
java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
}
@@ -131,7 +130,7 @@ object CoreUtils {
require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
}
- def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[Endpoint] = {
+ def listenerListToEndPoints(listeners: String, securityProtocolMap:
java.util.Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[Endpoint] = {
def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
(inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
(inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
@@ -186,7 +185,7 @@ object CoreUtils {
}
val endPoints = try {
- SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap.asJava).asScala
+ SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap).asScala
} catch {
case e: Exception =>
throw new IllegalArgumentException(s"Error creating broker listeners
from '$listeners': ${e.getMessage}", e)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 85b8e2298ef..6c491e739e3 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -275,7 +275,7 @@ abstract class QuorumTestHarness extends Logging {
formatter.addDirectory(metadataDir.getAbsolutePath)
formatter.setReleaseVersion(metadataVersion)
formatter.setUnstableFeatureVersionsEnabled(true)
- formatter.setControllerListenerName(config.controllerListenerNames.head)
+ formatter.setControllerListenerName(config.controllerListenerNames.get(0))
formatter.setMetadataLogDirectory(config.metadataLogDir)
val transactionVersion =
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 2e9f95beb51..d5dadcfd9f4 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey}
+import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
UnifiedLog, VerificationGuard}
diff --git
a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
index 939d63789ff..84c0e41b724 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
@@ -31,6 +31,9 @@ import org.apache.kafka.common.requests.{AbstractResponse,
AddPartitionsToTxnReq
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
+import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.{AppendCallback,
TransactionSupportedOperation}
+import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION,
DEFAULT_ERROR, GENERIC_ERROR_SUPPORTED}
import org.apache.kafka.server.util.RequestAndCompletionHandler
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -49,7 +52,7 @@ import scala.jdk.CollectionConverters._
class AddPartitionsToTxnManagerTest {
private val networkClient: NetworkClient = mock(classOf[NetworkClient])
private val metadataCache: MetadataCache = mock(classOf[MetadataCache])
- private val partitionFor: String => Int = mock(classOf[String => Int])
+ private val partitionFor: util.function.Function[String, Integer] =
mock(classOf[util.function.Function[String, Integer]])
private val time = new MockTime
@@ -73,7 +76,7 @@ class AddPartitionsToTxnManagerTest {
private val authenticationErrorResponse = clientResponse(null, authException
= new SaslAuthenticationException(""))
private val versionMismatchResponse = clientResponse(null, mismatchException
= new UnsupportedVersionException(""))
private val disconnectedResponse = clientResponse(null, disconnected = true)
- private val transactionSupportedOperation = genericErrorSupported
+ private val transactionSupportedOperation = GENERIC_ERROR_SUPPORTED
private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
@@ -93,14 +96,14 @@ class AddPartitionsToTxnManagerTest {
addPartitionsToTxnManager.shutdown()
}
- private def setErrors(errors: mutable.Map[TopicPartition,
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
- callbackErrors.foreachEntry(errors.put)
+ private def setErrors(errors: mutable.Map[TopicPartition, Errors]):
AppendCallback = {
+ callbackErrors => callbackErrors.forEach((tp, err) => errors.put(tp, err))
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testAddTxnData(isAddPartition: Boolean): Unit = {
- val transactionSupportedOperation = if (isAddPartition) addPartition else
genericErrorSupported
+ val transactionSupportedOperation = if (isAddPartition) ADD_PARTITION else
GENERIC_ERROR_SUPPORTED
when(partitionFor.apply(transactionalId1)).thenReturn(0)
when(partitionFor.apply(transactionalId2)).thenReturn(1)
when(partitionFor.apply(transactionalId3)).thenReturn(0)
@@ -111,9 +114,9 @@ class AddPartitionsToTxnManagerTest {
val transaction2Errors = mutable.Map[TopicPartition, Errors]()
val transaction3Errors = mutable.Map[TopicPartition, Errors]()
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors),
transactionSupportedOperation)
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors),
transactionSupportedOperation)
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3,
producerId3, producerEpoch = 0, topicPartitions, setErrors(transaction3Errors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava, setErrors(transaction1Errors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, 0, topicPartitions.asJava, setErrors(transaction2Errors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3,
producerId3, 0, topicPartitions.asJava, setErrors(transaction3Errors),
transactionSupportedOperation)
// We will try to add transaction1 3 more times (retries). One will have
the same epoch, one will have a newer epoch, and one will have an older epoch
than the new one we just added.
val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition,
Errors]()
@@ -121,17 +124,17 @@ class AddPartitionsToTxnManagerTest {
val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition,
Errors]()
// Trying to add more transactional data for the same transactional ID,
producer ID, and epoch should simply replace the old data and send a retriable
response.
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions,
setErrors(transaction1RetryWithSameEpochErrors), transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava,
setErrors(transaction1RetryWithSameEpochErrors), transactionSupportedOperation)
val expectedNetworkErrors = topicPartitions.map(_ ->
Errors.NETWORK_EXCEPTION).toMap
assertEquals(expectedNetworkErrors, transaction1Errors)
// Trying to add more transactional data for the same transactional ID and
producer ID, but new epoch should replace the old data and send an error
response for it.
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 1, topicPartitions,
setErrors(transaction1RetryWithNewerEpochErrors), transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 1, topicPartitions.asJava,
setErrors(transaction1RetryWithNewerEpochErrors), transactionSupportedOperation)
val expectedEpochErrors = topicPartitions.map(_ ->
Errors.INVALID_PRODUCER_EPOCH).toMap
assertEquals(expectedEpochErrors, transaction1RetryWithSameEpochErrors)
// Trying to add more transactional data for the same transactional ID and
producer ID, but an older epoch should immediately return with error and keep
the old data queued to send.
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions,
setErrors(transaction1RetryWithOldEpochErrors), transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava,
setErrors(transaction1RetryWithOldEpochErrors), transactionSupportedOperation)
assertEquals(expectedEpochErrors, transaction1RetryWithOldEpochErrors)
val requestsAndHandlers =
addPartitionsToTxnManager.generateRequests().asScala
@@ -162,12 +165,12 @@ class AddPartitionsToTxnManagerTest {
mockTransactionStateMetadata(0, 0, Some(node0))
mockTransactionStateMetadata(1, 1, Some(node1))
mockTransactionStateMetadata(2, 2, Some(node2))
- val transactionSupportedOperation = if (isAddPartition) addPartition else
genericErrorSupported
+ val transactionSupportedOperation = if (isAddPartition) ADD_PARTITION else
GENERIC_ERROR_SUPPORTED
val transactionErrors = mutable.Map[TopicPartition, Errors]()
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors),
transactionSupportedOperation)
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava, setErrors(transactionErrors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, 0, topicPartitions.asJava, setErrors(transactionErrors),
transactionSupportedOperation)
val requestsAndHandlers =
addPartitionsToTxnManager.generateRequests().asScala
assertEquals(2, requestsAndHandlers.size)
@@ -177,8 +180,8 @@ class AddPartitionsToTxnManagerTest {
else verifyRequest(node1, transactionalId2, producerId2,
!isAddPartition, requestAndHandler)
}
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors),
transactionSupportedOperation)
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3,
producerId3, producerEpoch = 0, topicPartitions, setErrors(transactionErrors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, 0, topicPartitions.asJava, setErrors(transactionErrors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId3,
producerId3, 0, topicPartitions.asJava, setErrors(transactionErrors),
transactionSupportedOperation)
// Test creationTimeMs increases too.
time.sleep(10)
@@ -209,8 +212,8 @@ class AddPartitionsToTxnManagerTest {
addPartitionsToTxnManager.addOrVerifyTransaction(
transactionalId1,
producerId1,
- producerEpoch = 0,
- topicPartitions,
+ 0,
+ topicPartitions.asJava,
setErrors(errors),
transactionSupportedOperation
)
@@ -245,16 +248,16 @@ class AddPartitionsToTxnManagerTest {
transaction1Errors.clear()
transaction2Errors.clear()
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors),
transactionSupportedOperation)
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava, setErrors(transaction1Errors),
transactionSupportedOperation)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, 0, topicPartitions.asJava, setErrors(transaction2Errors),
transactionSupportedOperation)
}
def addTransactionsToVerifyRequestVersion(operationExpected:
TransactionSupportedOperation): Unit = {
transaction1Errors.clear()
transaction2Errors.clear()
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors),
operationExpected)
- addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors),
operationExpected)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava, setErrors(transaction1Errors),
operationExpected)
+ addPartitionsToTxnManager.addOrVerifyTransaction(transactionalId2,
producerId2, 0, topicPartitions.asJava, setErrors(transaction2Errors),
operationExpected)
}
val expectedAuthErrors = topicPartitions.map(_ ->
Errors.SASL_AUTHENTICATION_FAILED).toMap
@@ -318,12 +321,12 @@ class AddPartitionsToTxnManagerTest {
val expectedTransactionAbortableErrorsTxn1HigherVersion =
topicPartitions.map(_ -> Errors.TRANSACTION_ABORTABLE).toMap
val expectedTransactionAbortableErrorsTxn2HigherVersion = Map(new
TopicPartition("foo", 2) -> Errors.TRANSACTION_ABORTABLE)
- addTransactionsToVerifyRequestVersion(defaultError)
+ addTransactionsToVerifyRequestVersion(DEFAULT_ERROR)
receiveResponse(mixedAbortableErrorsResponse)
assertEquals(expectedTransactionAbortableErrorsTxn1LowerVersion,
transaction1Errors)
assertEquals(expectedTransactionAbortableErrorsTxn2LowerVersion,
transaction2Errors)
- addTransactionsToVerifyRequestVersion(genericErrorSupported)
+ addTransactionsToVerifyRequestVersion(GENERIC_ERROR_SUPPORTED)
receiveResponse(mixedAbortableErrorsResponse)
assertEquals(expectedTransactionAbortableErrorsTxn1HigherVersion,
transaction1Errors)
assertEquals(expectedTransactionAbortableErrorsTxn2HigherVersion,
transaction2Errors)
@@ -351,8 +354,8 @@ class AddPartitionsToTxnManagerTest {
}
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup],
(mock: KafkaMetricsGroup, context: Context) => {
-
when(mock.newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationFailureRateMetricName),
anyString(), any(classOf[TimeUnit]))).thenReturn(mockVerificationFailureMeter)
-
when(mock.newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationTimeMsMetricName))).thenReturn(mockVerificationTime)
+
when(mock.newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_FAILURE_RATE_METRIC_NAME),
anyString(), any(classOf[TimeUnit]))).thenReturn(mockVerificationFailureMeter)
+
when(mock.newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_TIME_MS_METRIC_NAME))).thenReturn(mockVerificationTime)
})
val addPartitionsManagerWithMockedMetrics = new AddPartitionsToTxnManager(
@@ -364,8 +367,8 @@ class AddPartitionsToTxnManagerTest {
)
try {
-
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId1,
producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors),
transactionSupportedOperation)
-
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId2,
producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors),
transactionSupportedOperation)
+
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId1,
producerId1, 0, topicPartitions.asJava, setErrors(transactionErrors),
transactionSupportedOperation)
+
addPartitionsManagerWithMockedMetrics.addOrVerifyTransaction(transactionalId2,
producerId2, 0, topicPartitions.asJava, setErrors(transactionErrors),
transactionSupportedOperation)
time.sleep(100)
@@ -386,10 +389,10 @@ class AddPartitionsToTxnManagerTest {
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
-
verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationFailureRateMetricName),
anyString(), any(classOf[TimeUnit]))
-
verify(mockMetricsGroup).newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VerificationTimeMsMetricName))
-
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VerificationFailureRateMetricName)
-
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VerificationTimeMsMetricName)
+
verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_FAILURE_RATE_METRIC_NAME),
anyString(), any(classOf[TimeUnit]))
+
verify(mockMetricsGroup).newHistogram(ArgumentMatchers.eq(AddPartitionsToTxnManager.VERIFICATION_TIME_MS_METRIC_NAME))
+
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VERIFICATION_FAILURE_RATE_METRIC_NAME)
+
verify(mockMetricsGroup).removeMetric(AddPartitionsToTxnManager.VERIFICATION_TIME_MS_METRIC_NAME)
// assert that we have verified all invocations on the metrics group.
verifyNoMoreInteractions(mockMetricsGroup)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 77e8449199c..4f269d5ed7c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -441,7 +441,7 @@ class KafkaConfigTest {
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
val controllerListenerName = new ListenerName("CONTROLLER")
- assertEquals(Some(SecurityProtocol.PLAINTEXT),
+ assertEquals(SecurityProtocol.PLAINTEXT,
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
// ensure we don't map it to PLAINTEXT when there is a SSL or SASL
controller listener
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER,SSL")
@@ -454,7 +454,7 @@ class KafkaConfigTest {
props.remove(SocketServerConfigs.LISTENERS_CONFIG)
// ensure we don't map it to PLAINTEXT when it is explicitly mapped
otherwise
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
- assertEquals(Some(SecurityProtocol.SSL),
+ assertEquals(SecurityProtocol.SSL,
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
// ensure we don't map it to PLAINTEXT when anything is explicitly given
// (i.e. it is only part of the default value, even with KRaft)
@@ -463,7 +463,7 @@ class KafkaConfigTest {
// ensure we can map it to a non-PLAINTEXT security protocol by default
(i.e. when nothing is given)
props.remove(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
- assertEquals(Some(SecurityProtocol.SSL),
+ assertEquals(SecurityProtocol.SSL,
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new
ListenerName("SSL")))
}
@@ -475,9 +475,9 @@ class KafkaConfigTest {
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER1,CONTROLLER2")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092")
- assertEquals(Some(SecurityProtocol.PLAINTEXT),
+ assertEquals(SecurityProtocol.PLAINTEXT,
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new
ListenerName("CONTROLLER1")))
- assertEquals(Some(SecurityProtocol.PLAINTEXT),
+ assertEquals(SecurityProtocol.PLAINTEXT,
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new
ListenerName("CONTROLLER2")))
}
@@ -511,11 +511,11 @@ class KafkaConfigTest {
new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))
assertEquals(expectedListeners, config.listeners)
assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
- val expectedSecurityProtocolMap = Map(
- new ListenerName("CLIENT") -> SecurityProtocol.SSL,
- new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
- new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
- new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
+ val expectedSecurityProtocolMap = util.Map.of(
+ new ListenerName("CLIENT"), SecurityProtocol.SSL,
+ new ListenerName("REPLICATION"), SecurityProtocol.SSL,
+ new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT,
+ new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT
)
assertEquals(expectedSecurityProtocolMap,
config.effectiveListenerSecurityProtocolMap)
}
@@ -546,10 +546,10 @@ class KafkaConfigTest {
)
assertEquals(expectedAdvertisedListeners,
config.effectiveAdvertisedBrokerListeners)
- val expectedSecurityProtocolMap = Map(
- new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
- new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
- new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
+ val expectedSecurityProtocolMap = util.Map.of(
+ new ListenerName("EXTERNAL"), SecurityProtocol.SSL,
+ new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT,
+ new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT
)
assertEquals(expectedSecurityProtocolMap,
config.effectiveListenerSecurityProtocolMap)
}
@@ -597,7 +597,7 @@ class KafkaConfigTest {
}
private def listenerListToEndPoints(listenerList: String,
- securityProtocolMap:
collection.Map[ListenerName, SecurityProtocol] =
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO.asScala) =
+ securityProtocolMap: util.Map[ListenerName,
SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b19de2dde1a..cdae51bb935 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,6 +69,9 @@ import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
+import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
+import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
+import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION,
GENERIC_ERROR_SUPPORTED}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
@@ -155,7 +158,7 @@ class ReplicaManagerTest {
// Anytime we try to verify, just automatically run the callback as though
the transaction was verified.
when(addPartitionsToTxnManager.addOrVerifyTransaction(any(), any(), any(),
any(), any(), any())).thenAnswer { invocationOnMock =>
val callback = invocationOnMock.getArgument(4,
classOf[AddPartitionsToTxnManager.AppendCallback])
- callback(Map.empty[TopicPartition, Errors].toMap)
+ callback.complete(util.Map.of())
}
// make sure metadataCache can map between topic name and id
setupMetadataCacheWithTopicIds(topicIds, metadataCache)
@@ -2195,7 +2198,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
any[AddPartitionsToTxnManager.AppendCallback](),
any()
)
@@ -2232,7 +2235,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
@@ -2241,7 +2244,7 @@ class ReplicaManagerTest {
// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
+ callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE))
assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
@@ -2252,14 +2255,14 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback2.capture(),
any()
)
assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
val callback2: AddPartitionsToTxnManager.AppendCallback =
appendCallback2.getValue
- callback2(Map.empty[TopicPartition, Errors].toMap)
+ callback2.complete(util.Map.of())
assertEquals(VerificationGuard.SENTINEL,
getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId,
producerEpoch))
} finally {
@@ -2295,13 +2298,13 @@ class ReplicaManagerTest {
// We should add these partitions to the manager to verify.
val result = handleProduceAppend(replicaManager, tp0,
transactionalRecords, origin = AppendOrigin.CLIENT,
- transactionalId = transactionalId, transactionSupportedOperation =
addPartition)
+ transactionalId = transactionalId, transactionSupportedOperation =
ADD_PARTITION)
val appendCallback =
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
@@ -2310,7 +2313,7 @@ class ReplicaManagerTest {
// Confirm we did not write to the log and instead returned error.
var callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map(tp0 -> error).toMap)
+ callback.complete(util.Map.of(tp0, error))
if (error != Errors.CONCURRENT_TRANSACTIONS) {
// NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS
@@ -2327,12 +2330,12 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
callback = appendCallback.getValue
- callback(Map.empty[TopicPartition, Errors].toMap)
+ callback.complete(util.Map.of())
assertEquals(VerificationGuard.SENTINEL,
getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId,
producerEpoch))
}
@@ -2366,7 +2369,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
@@ -2375,7 +2378,7 @@ class ReplicaManagerTest {
// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map(tp0 -> Errors.INVALID_PRODUCER_ID_MAPPING).toMap)
+ callback.complete(util.Map.of(tp0, Errors.INVALID_PRODUCER_ID_MAPPING))
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING,
result.assertFired.error)
assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
@@ -2389,7 +2392,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback2.capture(),
any()
)
@@ -2397,7 +2400,7 @@ class ReplicaManagerTest {
// Verification should succeed, but we expect to fail with
OutOfOrderSequence and for the VerificationGuard to remain.
val callback2: AddPartitionsToTxnManager.AppendCallback =
appendCallback2.getValue
- callback2(Map.empty[TopicPartition, Errors].toMap)
+ callback2.complete(util.Map.of())
assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER,
result2.assertFired.error)
} finally {
@@ -2445,7 +2448,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
@@ -2455,7 +2458,7 @@ class ReplicaManagerTest {
// simulate successful verification
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map.empty[TopicPartition, Errors].toMap)
+ callback.complete(util.Map.of())
assertEquals(VerificationGuard.SENTINEL,
getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId,
producerEpoch))
@@ -2652,7 +2655,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
@@ -2669,7 +2672,7 @@ class ReplicaManagerTest {
// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
+ callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE))
assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
@@ -2711,20 +2714,20 @@ class ReplicaManagerTest {
// Start verification and return the coordinator related errors.
val expectedMessage = s"Unable to verify the partition has been added to
the transaction. Underlying error: ${error.toString}"
- val result = handleProduceAppend(replicaManager, tp0,
transactionalRecords, transactionalId = transactionalId,
transactionSupportedOperation = addPartition)
+ val result = handleProduceAppend(replicaManager, tp0,
transactionalRecords, transactionalId = transactionalId,
transactionSupportedOperation = ADD_PARTITION)
val appendCallback =
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
// Confirm we did not write to the log and instead returned the
converted error with the correct error message.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map(tp0 -> error).toMap)
+ callback.complete(util.Map.of(tp0, error))
assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
assertEquals(expectedMessage, result.assertFired.errorMessage)
} finally {
@@ -2767,14 +2770,14 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
// Confirm we did not write to the log and instead returned the
converted error with the correct error message.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
- callback(Map(tp0 -> error).toMap)
+ callback.complete(util.Map.of(tp0, error))
assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
assertEquals(expectedMessage, result.assertFired.errorMessage)
} finally {
@@ -2798,7 +2801,7 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
- ArgumentMatchers.eq(Seq(tp0)),
+ ArgumentMatchers.eq(util.List.of(tp0)),
appendCallback.capture(),
any()
)
@@ -3088,7 +3091,7 @@ class ReplicaManagerTest {
entriesToAppend:
Map[TopicPartition, MemoryRecords],
transactionalId: String,
requiredAcks: Short = -1,
-
transactionSupportedOperation: TransactionSupportedOperation =
genericErrorSupported
+
transactionSupportedOperation: TransactionSupportedOperation =
GENERIC_ERROR_SUPPORTED
):
CallbackResult[Map[TopicIdPartition, PartitionResponse]] = {
val result = new CallbackResult[Map[TopicIdPartition, PartitionResponse]]()
def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]):
Unit = {
@@ -3115,7 +3118,7 @@ class ReplicaManagerTest {
origin: AppendOrigin = AppendOrigin.CLIENT,
requiredAcks: Short = -1,
transactionalId: String,
- transactionSupportedOperation:
TransactionSupportedOperation = genericErrorSupported
+ transactionSupportedOperation:
TransactionSupportedOperation = GENERIC_ERROR_SUPPORTED
): CallbackResult[PartitionResponse] = {
val result = new CallbackResult[PartitionResponse]()
@@ -3148,7 +3151,7 @@ class ReplicaManagerTest {
producerId: Long,
producerEpoch:
Short,
baseSequence: Int
= 0,
-
transactionSupportedOperation: TransactionSupportedOperation =
genericErrorSupported
+
transactionSupportedOperation: TransactionSupportedOperation =
GENERIC_ERROR_SUPPORTED
):
CallbackResult[Either[Errors, VerificationGuard]] = {
val result = new CallbackResult[Either[Errors, VerificationGuard]]()
def postVerificationCallback(errorAndGuard: (Errors, VerificationGuard)):
Unit = {
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 0cb4be79f2a..16d61722727 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -16,9 +16,13 @@
*/
package org.apache.kafka.server.config;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
@@ -36,8 +40,10 @@ import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* During moving {@link kafka.server.KafkaConfig} out of core
AbstractKafkaConfig will be the future KafkaConfig
@@ -91,4 +97,113 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
public int backgroundThreads() {
return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
}
+
+ public int brokerId() {
+ return getInt(ServerConfigs.BROKER_ID_CONFIG);
+ }
+
+ public int requestTimeoutMs() {
+ return getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+ }
+
+ public List<String> controllerListenerNames() {
+ return
Csv.parseCsvList(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG));
+ }
+
+ public ListenerName interBrokerListenerName() {
+ return interBrokerListenerNameAndSecurityProtocol().getKey();
+ }
+
+ public SecurityProtocol interBrokerSecurityProtocol() {
+ return interBrokerListenerNameAndSecurityProtocol().getValue();
+ }
+
+ public Map<ListenerName, SecurityProtocol>
effectiveListenerSecurityProtocolMap() {
+ Map<ListenerName, SecurityProtocol> mapValue =
+
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ e -> ListenerName.normalised(e.getKey()),
+ e -> securityProtocol(
+ e.getValue(),
+
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)));
+
+ if
(!originals().containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
{
+ // Using the default configuration since
listener.security.protocol.map is not explicitly set.
+ // Before adding default PLAINTEXT mappings for controller
listeners, verify that:
+ // 1. No SSL or SASL protocols are used in controller listeners
+ // 2. No SSL or SASL protocols are used in regular listeners
(Note: controller listeners
+ // are not included in 'listeners' config when
process.roles=broker)
+ if
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl)
||
+
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+ .anyMatch(listenerName ->
isSslOrSasl(parseListenerName(listenerName)))) {
+ return mapValue;
+ } else {
+ // Add the PLAINTEXT mappings for all controller listener
names that are not explicitly PLAINTEXT
+ mapValue.putAll(controllerListenerNames().stream()
+ .filter(listenerName ->
!SecurityProtocol.PLAINTEXT.name.equals(listenerName))
+ .collect(Collectors.toMap(ListenerName::new, ignored
-> SecurityProtocol.PLAINTEXT)));
+ return mapValue;
+ }
+ } else {
+ return mapValue;
+ }
+ }
+
+ public static Map<String, String> getMap(String propName, String
propValue) {
+ try {
+ return Csv.parseCsvMap(propValue);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Error parsing configuration property '%s':
%s", propName, e.getMessage()));
+ }
+ }
+
+ private static SecurityProtocol securityProtocol(String protocolName,
String configName) {
+ try {
+ return SecurityProtocol.forName(protocolName);
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException(
+ String.format("Invalid security protocol `%s` defined in
%s", protocolName, configName));
+ }
+ }
+
+ private Map.Entry<ListenerName, SecurityProtocol>
interBrokerListenerNameAndSecurityProtocol() {
+ String interBrokerListenerName =
getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG);
+ if (interBrokerListenerName != null) {
+ if
(originals().containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG))
{
+ throw new ConfigException(String.format("Only one of %s and %s
should be set.",
+ ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG,
+
ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG));
+ }
+ ListenerName listenerName =
ListenerName.normalised(interBrokerListenerName);
+ SecurityProtocol securityProtocol =
effectiveListenerSecurityProtocolMap().get(listenerName);
+ if (securityProtocol == null) {
+ throw new ConfigException("Listener with name " +
listenerName.value() + " defined in " +
+ ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG +
" not found in " +
+
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG + ".");
+ }
+ return Map.entry(listenerName, securityProtocol);
+ } else {
+ SecurityProtocol securityProtocol = securityProtocol(
+
getString(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG),
+ ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG);
+ return
Map.entry(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
+ }
+ }
+
+ private static boolean isSslOrSasl(String name) {
+ return name.equals(SecurityProtocol.SSL.name) ||
name.equals(SecurityProtocol.SASL_SSL.name) ||
+ name.equals(SecurityProtocol.SASL_PLAINTEXT.name);
+ }
+
+ private static String parseListenerName(String connectionString) {
+ int firstColon = connectionString.indexOf(':');
+ if (firstColon < 0) {
+ throw new KafkaException("Unable to parse a listener name from " +
connectionString);
+ }
+ return connectionString.substring(0,
firstColon).toUpperCase(Locale.ROOT);
+ }
}
diff --git
a/server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java
b/server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java
new file mode 100644
index 00000000000..b0db7c823f7
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.transaction;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.ADD_PARTITION;
+import static
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.DEFAULT_ERROR;
+import static
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.GENERIC_ERROR_SUPPORTED;
+
+public class AddPartitionsToTxnManager extends InterBrokerSendThread {
+
+ public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME =
"VerificationFailureRate";
+ public static final String VERIFICATION_TIME_MS_METRIC_NAME =
"VerificationTimeMs";
+
+ /**
+ * handles the Partition Response based on the Request Version and the
exact operation.
+ */
+ public enum TransactionSupportedOperation {
+ /**
+ * This is the default workflow which maps to cases when the Produce
Request Version or the
+ * Txn_offset_commit request was lower than the first version
supporting the new Error Class.
+ */
+ DEFAULT_ERROR(false),
+ /**
+ * This maps to the case when the clients are updated to handle the
TransactionAbortableException.
+ */
+ GENERIC_ERROR_SUPPORTED(false),
+ /**
+ * This allows the partition to be added to the transactions inflight
with the Produce and TxnOffsetCommit requests.
+ * Plus the behaviors in genericErrorSupported.
+ */
+ ADD_PARTITION(true);
+
+ public final boolean supportsEpochBump;
+
+ TransactionSupportedOperation(boolean supportsEpochBump) {
+ this.supportsEpochBump = supportsEpochBump;
+ }
+ }
+
+ @FunctionalInterface
+ public interface AppendCallback {
+ void complete(Map<TopicPartition, Errors> partitionErrors);
+ }
+
+ public static TransactionSupportedOperation
produceRequestVersionToTransactionSupportedOperation(short version) {
+ if (version > 11) {
+ return ADD_PARTITION;
+ } else if (version > 10) {
+ return GENERIC_ERROR_SUPPORTED;
+ } else {
+ return DEFAULT_ERROR;
+ }
+ }
+
+ public static TransactionSupportedOperation
txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
+ if (version > 4) {
+ return ADD_PARTITION;
+ } else if (version > 3) {
+ return GENERIC_ERROR_SUPPORTED;
+ } else {
+ return DEFAULT_ERROR;
+ }
+ }
+
+ /*
+ * Data structure to hold the transactional data to send to a node. Note
-- at most one request per transactional ID
+ * will exist at a time in the map. If a given transactional ID exists in
the map, and a new request with the same ID
+ * comes in, one request will be in the map and one will return to the
producer with a response depending on the epoch.
+ */
+ public record TransactionDataAndCallbacks(
+ AddPartitionsToTxnTransactionCollection transactionData,
+ Map<String, AppendCallback> callbacks,
+ Map<String, Long> startTimeMs,
+ TransactionSupportedOperation transactionSupportedOperation) { }
+
+ private class AddPartitionsToTxnHandler implements
RequestCompletionHandler {
+ private final Node node;
+ private final TransactionDataAndCallbacks transactionDataAndCallbacks;
+
+ public AddPartitionsToTxnHandler(Node node,
TransactionDataAndCallbacks transactionDataAndCallbacks) {
+ this.node = node;
+ this.transactionDataAndCallbacks = transactionDataAndCallbacks;
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ // Note: Synchronization is not needed on inflightNodes since it
is always accessed from this thread.
+ inflightNodes.remove(node);
+ if (response.authenticationException() != null) {
+ log.error("AddPartitionsToTxnRequest failed for node {} with
an authentication exception.", response.destination(),
response.authenticationException());
+
sendCallbacksToAll(Errors.forException(response.authenticationException()).code());
+ } else if (response.versionMismatch() != null) {
+ // We may see unsupported version exception if we try to send
a verify only request to a broker that can't handle it.
+ // In this case, skip verification.
+ log.warn("AddPartitionsToTxnRequest failed for node {} with
invalid version exception. " +
+ "This suggests verification is not supported.
Continuing handling the produce request.", response.destination());
+ transactionDataAndCallbacks.callbacks().forEach((txnId,
callback) ->
+ sendCallback(callback, Map.of(),
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+ } else if (response.wasDisconnected() || response.wasTimedOut()) {
+ log.warn("AddPartitionsToTxnRequest failed for node {} with a
network exception.", response.destination());
+ sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code());
+ } else {
+ AddPartitionsToTxnResponseData responseData =
((AddPartitionsToTxnResponse) response.responseBody()).data();
+ if (responseData.errorCode() != 0) {
+ log.error("AddPartitionsToTxnRequest for node {} returned
with error {}.",
+ response.destination(),
Errors.forCode(responseData.errorCode()));
+ // The client should not be exposed to
CLUSTER_AUTHORIZATION_FAILED so modify the error to signify the verification
did not complete.
+ // Return INVALID_TXN_STATE.
+ short finalError = responseData.errorCode() ==
Errors.CLUSTER_AUTHORIZATION_FAILED.code()
+ ? Errors.INVALID_TXN_STATE.code() :
responseData.errorCode();
+ sendCallbacksToAll(finalError);
+ } else {
+ for
(AddPartitionsToTxnResponseData.AddPartitionsToTxnResult txnResult :
responseData.resultsByTransaction()) {
+ Map<TopicPartition, Errors> unverified = new
HashMap<>();
+ for
(AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult topicResult :
txnResult.topicResults()) {
+ for
(AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult
partitionResult : topicResult.resultsByPartition()) {
+ TopicPartition tp = new
TopicPartition(topicResult.name(), partitionResult.partitionIndex());
+ if (partitionResult.partitionErrorCode() !=
Errors.NONE.code()) {
+ // Producers expect to handle
INVALID_PRODUCER_EPOCH in this scenario.
+ short code;
+ if (partitionResult.partitionErrorCode()
== Errors.PRODUCER_FENCED.code()) {
+ code =
Errors.INVALID_PRODUCER_EPOCH.code();
+ } else if
(partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code()
+ &&
transactionDataAndCallbacks.transactionSupportedOperation().equals(DEFAULT_ERROR))
{ // For backward compatibility with clients
+ code = Errors.INVALID_TXN_STATE.code();
+ } else {
+ code =
partitionResult.partitionErrorCode();
+ }
+ unverified.put(tp, Errors.forCode(code));
+ }
+ }
+ }
+ verificationFailureRate.mark(unverified.size());
+ AppendCallback callback =
transactionDataAndCallbacks.callbacks().get(txnResult.transactionalId());
+ sendCallback(callback, unverified,
transactionDataAndCallbacks.startTimeMs.get(txnResult.transactionalId()));
+ }
+ }
+ }
+ wakeup();
+ }
+
+ private Map<TopicPartition, Errors> buildErrorMap(String
transactionalId, short errorCode) {
+ AddPartitionsToTxnTransaction transactionData =
transactionDataAndCallbacks.transactionData.find(transactionalId);
+ return topicPartitionsToError(transactionData,
Errors.forCode(errorCode));
+ }
+
+ private void sendCallbacksToAll(short errorCode) {
+ transactionDataAndCallbacks.callbacks.forEach((txnId, cb) ->
+ sendCallback(cb, buildErrorMap(txnId, errorCode),
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+ }
+ }
+
+ private final MetadataCache metadataCache;
+ private final Function<String, Integer> partitionFor;
+ private final Time time;
+
+ private final ListenerName interBrokerListenerName;
+ private final Set<Node> inflightNodes = new HashSet<>();
+ private final Map<Node, TransactionDataAndCallbacks> nodesToTransactions =
new HashMap<>();
+
+ // For compatibility - this metrics group was previously defined within
+ // a Scala class named `kafka.server.AddPartitionsToTxnManager`
+ private final KafkaMetricsGroup metricsGroup = new
KafkaMetricsGroup("kafka.server", "AddPartitionsToTxnManager");
+ private final Meter verificationFailureRate =
metricsGroup.newMeter(VERIFICATION_FAILURE_RATE_METRIC_NAME, "failures",
TimeUnit.SECONDS);
+ private final Histogram verificationTimeMs =
metricsGroup.newHistogram(VERIFICATION_TIME_MS_METRIC_NAME);
+
+ public AddPartitionsToTxnManager(
+ AbstractKafkaConfig config,
+ NetworkClient client,
+ MetadataCache metadataCache,
+ Function<String, Integer> partitionFor,
+ Time time) {
+ super("AddPartitionsToTxnSenderThread-" + config.brokerId(), client,
config.requestTimeoutMs(), time);
+ this.interBrokerListenerName = config.interBrokerListenerName();
+ this.metadataCache = metadataCache;
+ this.partitionFor = partitionFor;
+ this.time = time;
+ }
+
+ public void addOrVerifyTransaction(
+ String transactionalId,
+ long producerId,
+ short producerEpoch,
+ Collection<TopicPartition> topicPartitions,
+ AppendCallback callback,
+ TransactionSupportedOperation transactionSupportedOperation) {
+ Optional<Node> coordinator =
getTransactionCoordinator(partitionFor.apply(transactionalId));
+ if (coordinator.isEmpty()) {
+ callback.complete(topicPartitions.stream().collect(
+ Collectors.toMap(Function.identity(), tp ->
Errors.COORDINATOR_NOT_AVAILABLE)));
+ } else {
+ AddPartitionsToTxnTopicCollection topicCollection = new
AddPartitionsToTxnTopicCollection();
+
topicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
tps) -> {
+ topicCollection.add(new AddPartitionsToTxnTopic()
+ .setName(topic)
+
.setPartitions(tps.stream().map(TopicPartition::partition).collect(Collectors.toList())));
+ });
+
+ AddPartitionsToTxnTransaction transactionData = new
AddPartitionsToTxnTransaction()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerId)
+ .setProducerEpoch(producerEpoch)
+
.setVerifyOnly(!transactionSupportedOperation.supportsEpochBump)
+ .setTopics(topicCollection);
+
+ addTxnData(coordinator.get(), transactionData, callback,
transactionSupportedOperation);
+ }
+ }
+
+ private void addTxnData(
+ Node node,
+ AddPartitionsToTxnTransaction transactionData,
+ AppendCallback callback,
+ TransactionSupportedOperation transactionSupportedOperation) {
+ synchronized (nodesToTransactions) {
+ long curTime = time.milliseconds();
+ // Check if we have already had either node or individual
transaction. Add the Node if it isn't there.
+ TransactionDataAndCallbacks existingNodeAndTransactionData =
nodesToTransactions.computeIfAbsent(node,
+ ignored -> new TransactionDataAndCallbacks(
+ new AddPartitionsToTxnTransactionCollection(1),
+ new HashMap<>(),
+ new HashMap<>(),
+ transactionSupportedOperation));
+
+ AddPartitionsToTxnTransaction existingTransactionData =
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId());
+
+ // There are 3 cases if we already have existing data
+ // 1. Incoming data has a higher epoch -- return
INVALID_PRODUCER_EPOCH for existing data since it is fenced
+ // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION
for existing data, since the client is likely retrying and we want another
retriable exception
+ // 3. Incoming data has a lower epoch -- return
INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add
incoming data to verify
+ if (existingTransactionData != null) {
+ if (existingTransactionData.producerEpoch() <=
transactionData.producerEpoch()) {
+ Errors error = (existingTransactionData.producerEpoch() <
transactionData.producerEpoch())
+ ? Errors.INVALID_PRODUCER_EPOCH :
Errors.NETWORK_EXCEPTION;
+ AppendCallback oldCallback =
existingNodeAndTransactionData.callbacks.get(transactionData.transactionalId());
+
existingNodeAndTransactionData.transactionData.remove(transactionData);
+ sendCallback(oldCallback,
topicPartitionsToError(existingTransactionData, error),
existingNodeAndTransactionData.startTimeMs.get(transactionData.transactionalId()));
+ } else {
+ // If the incoming transactionData's epoch is lower, we
can return with INVALID_PRODUCER_EPOCH immediately.
+ sendCallback(callback,
topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH),
curTime);
+ return;
+ }
+ }
+
+
existingNodeAndTransactionData.transactionData.add(transactionData);
+
existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId(),
callback);
+
existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId(),
curTime);
+ wakeup();
+ }
+ }
+
+ private Optional<Node> getTransactionCoordinator(int partition) {
+ return
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
+ .filter(leaderAndIsr -> leaderAndIsr.leader() !=
MetadataResponse.NO_LEADER_ID)
+ .flatMap(metadata ->
metadataCache.getAliveBrokerNode(metadata.leader(), interBrokerListenerName));
+ }
+
+ private Map<TopicPartition, Errors>
topicPartitionsToError(AddPartitionsToTxnTransaction txnData, Errors error) {
+ Map<TopicPartition, Errors> topicPartitionsToError = new HashMap<>();
+ txnData.topics().forEach(topic ->
+ topic.partitions().forEach(partition ->
+ topicPartitionsToError.put(new TopicPartition(topic.name(),
partition), error)));
+ verificationFailureRate.mark(topicPartitionsToError.size());
+ return topicPartitionsToError;
+ }
+
+ private void sendCallback(AppendCallback callback, Map<TopicPartition,
Errors> errors, long startTimeMs) {
+ verificationTimeMs.update(time.milliseconds() - startTimeMs);
+ callback.complete(errors);
+ }
+
+ @Override
+ public Collection<RequestAndCompletionHandler> generateRequests() {
+ // build and add requests to the queue
+ List<RequestAndCompletionHandler> list = new ArrayList<>();
+ var currentTimeMs = time.milliseconds();
+ synchronized (nodesToTransactions) {
+ var iter = nodesToTransactions.entrySet().iterator();
+ while (iter.hasNext()) {
+ var entry = iter.next();
+ var node = entry.getKey();
+ var transactionDataAndCallbacks = entry.getValue();
+ if (!inflightNodes.contains(node)) {
+ list.add(new RequestAndCompletionHandler(
+ currentTimeMs,
+ node,
+
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData()),
+ new AddPartitionsToTxnHandler(node,
transactionDataAndCallbacks)
+ ));
+ inflightNodes.add(node);
+ iter.remove();
+ }
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ super.shutdown();
+ metricsGroup.removeMetric(VERIFICATION_FAILURE_RATE_METRIC_NAME);
+ metricsGroup.removeMetric(VERIFICATION_TIME_MS_METRIC_NAME);
+ }
+}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index 70ee6d2d624..011b0978b17 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -137,7 +137,7 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
.next()
.config()
.controllerListenerNames()
- .head()
+ .get(0)
);
}