This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 720f4f446d0 KAFKA-18654[2/2]: Transction V2 retry add partitions on
the server side when handling produce request. (#18810)
720f4f446d0 is described below
commit 720f4f446d0cd26c71c9538e0f49df25b3446308
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Feb 13 09:30:58 2025 -0800
KAFKA-18654[2/2]: Transction V2 retry add partitions on the server side
when handling produce request. (#18810)
During the transaction commit phase, it is normal to hit
CONCURRENT_TRANSACTION error before the transaction markers are fully
propagated. Instead of letting the client to retry the produce request, it is
better to retry on the server side.
Reviewers: Artem Livshits <[email protected]>, Justine Olshan
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +-
.../main/scala/kafka/server/ReplicaManager.scala | 45 ++++++++++--
.../unit/kafka/server/ReplicaManagerTest.scala | 81 +++++++++++++++++++++-
.../kafka/server/config/AbstractKafkaConfig.java | 4 +-
.../transaction/AddPartitionsToTxnConfig.java | 56 +++++++++++++++
5 files changed, 178 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6698d9eb21f..17b8cefb1ee 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -206,8 +206,10 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
private val _transactionLogConfig = new TransactionLogConfig(this)
private val _transactionStateManagerConfig = new
TransactionStateManagerConfig(this)
+ private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
def transactionStateManagerConfig: TransactionStateManagerConfig =
_transactionStateManagerConfig
+ def addPartitionsToTxnConfig: AddPartitionsToTxnConfig =
_addPartitionsToTxnConfig
private val _quotaConfig = new QuotaConfig(this)
def quotaConfig: QuotaConfig = _quotaConfig
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4cb9ce35f64..e50b5a500ab 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -790,18 +790,49 @@ class ReplicaManager(val config: KafkaConfig,
return
}
+ // Wrap the callback to be handled on an arbitrary request handler thread
+ // when transaction verification is complete. The request local passed in
+ // is only used when the callback is executed immediately.
+ val wrappedPostVerificationCallback =
KafkaRequestHandler.wrapAsyncCallback(
+ postVerificationCallback,
+ requestLocal
+ )
+
+ val retryTimeoutMs =
Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(),
config.requestTimeoutMs)
+ val addPartitionsRetryBackoffMs =
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
+ val startVerificationTimeMs = time.milliseconds
+ def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition,
Errors], Map[TopicPartition, VerificationGuard])): Unit = {
+ if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
+ // We've exceeded the retry timeout, so just call the callback with
whatever results we have
+ wrappedPostVerificationCallback(results)
+ } else if (results._1.values.exists(_ ==
Errors.CONCURRENT_TRANSACTIONS)) {
+ // Retry the verification with backoff
+ scheduler.scheduleOnce("retry-add-partitions-to-txn", () => {
+ maybeSendPartitionsToTransactionCoordinator(
+ topicPartitionBatchInfo,
+ transactionalId,
+ transactionalProducerInfo.head._1,
+ transactionalProducerInfo.head._2,
+ maybeRetryOnConcurrentTransactions,
+ transactionSupportedOperation
+ )
+ }, addPartitionsRetryBackoffMs * 1L)
+ } else {
+ // We don't have concurrent transaction errors, so just call the
callback with the results
+ wrappedPostVerificationCallback(results)
+ }
+ }
+
maybeSendPartitionsToTransactionCoordinator(
topicPartitionBatchInfo,
transactionalId,
transactionalProducerInfo.head._1,
transactionalProducerInfo.head._2,
- // Wrap the callback to be handled on an arbitrary request handler thread
- // when transaction verification is complete. The request local passed in
- // is only used when the callback is executed immediately.
- KafkaRequestHandler.wrapAsyncCallback(
- postVerificationCallback,
- requestLocal
- ),
+ // If we add partition directly from produce request,
+ // we should retry on concurrent transaction error here because:
+ // - the produce backoff adds too much delay
+ // - the produce request is expensive to retry
+ if (transactionSupportedOperation.supportsEpochBump)
maybeRetryOnConcurrentTransactions else wrappedPostVerificationCallback,
transactionSupportedOperation
)
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 63df3bfc8ee..def3a483c8a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -63,7 +63,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.server.util.timer.MockTimer
-import org.apache.kafka.server.util.{MockScheduler, MockTime}
+import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata,
LogOffsetSnapshot, LogSegments, ProducerStateManager,
ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
@@ -2249,6 +2249,80 @@ class ReplicaManagerTest {
}
}
+ @ParameterizedTest
+ @EnumSource(
+ value = classOf[Errors],
+ names = Array(
+ "NOT_COORDINATOR",
+ "CONCURRENT_TRANSACTIONS"
+ )
+ )
+ def testTransactionAddPartitionRetry(error: Errors): Unit = {
+ val tp0 = new TopicPartition(topic, 0)
+ val producerId = 24L
+ val producerEpoch = 0.toShort
+ val sequence = 6
+ val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+ val scheduler = new MockScheduler(time)
+
+ val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
List(tp0), scheduler = scheduler)
+ try {
+ replicaManager.becomeLeaderOrFollower(1,
+ makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
+ (_, _) => ())
+
+ // Append some transactional records.
+ val transactionalRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId,
producerEpoch, sequence,
+ new SimpleRecord("message".getBytes))
+
+ // We should add these partitions to the manager to verify.
+ val result = handleProduceAppend(replicaManager, tp0,
transactionalRecords, origin = AppendOrigin.CLIENT,
+ transactionalId = transactionalId, transactionSupportedOperation =
addPartition)
+ val appendCallback =
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+ verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(producerEpoch),
+ ArgumentMatchers.eq(Seq(tp0)),
+ appendCallback.capture(),
+ any()
+ )
+ val verificationGuard = getVerificationGuard(replicaManager, tp0,
producerId)
+ assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
+
+ // Confirm we did not write to the log and instead returned error.
+ var callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue()
+ callback(Map(tp0 -> error).toMap)
+
+ if (error != Errors.CONCURRENT_TRANSACTIONS) {
+ // NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS
+ assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+ } else {
+ // The append should not finish with error, it should retry later.
+ assertFalse(result.hasFired)
+ assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
+
+
time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1)
+ scheduler.tick()
+
+ verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(producerEpoch),
+ ArgumentMatchers.eq(Seq(tp0)),
+ appendCallback.capture(),
+ any()
+ )
+ callback = appendCallback.getValue()
+ callback(Map.empty[TopicPartition, Errors].toMap)
+ assertEquals(VerificationGuard.SENTINEL,
getVerificationGuard(replicaManager, tp0, producerId))
+
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId,
producerEpoch))
+ }
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
@Test
def testTransactionVerificationBlocksOutOfOrderSequence(): Unit = {
val tp0 = new TopicPartition(topic, 0)
@@ -3117,7 +3191,8 @@ class ReplicaManagerTest {
private def
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager:
AddPartitionsToTxnManager,
transactionalTopicPartitions: List[TopicPartition],
- config:
KafkaConfig = config): ReplicaManager = {
+ config:
KafkaConfig = config,
+
scheduler: Scheduler = new MockScheduler(time)): ReplicaManager = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)))
val metadataCache = mock(classOf[MetadataCache])
@@ -3125,7 +3200,7 @@ class ReplicaManagerTest {
metrics = metrics,
config = config,
time = time,
- scheduler = new MockScheduler(time),
+ scheduler = scheduler,
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = metadataCache,
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 7b01aa5813b..918534fce7a 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.AddPartitionsToTxnConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.network.SocketServerConfigs;
@@ -63,7 +64,8 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
MetricConfigs.CONFIG_DEF,
QuotaConfig.CONFIG_DEF,
BrokerSecurityConfigs.CONFIG_DEF,
- DelegationTokenManagerConfigs.CONFIG_DEF
+ DelegationTokenManagerConfigs.CONFIG_DEF,
+ AddPartitionsToTxnConfig.CONFIG_DEF
));
public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals,
Map<String, ?> configProviderProps, boolean doLog) {
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
new file mode 100644
index 00000000000..951bebddf06
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.transaction;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+public final class AddPartitionsToTxnConfig {
+ // The default config values for the server-side add partition to
transaction operations.
+ public static final String
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG =
"add.partitions.to.txn.retry.backoff.max.ms";
+ public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT
= 100;
+ public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC
= "The maximum allowed timeout for adding " +
+ "partitions to transactions on the server side. It only applies to
the actual add partition operations, " +
+ "not the verification. It will not be effective if it is larger
than request.timeout.ms";
+ public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG =
"add.partitions.to.txn.retry.backoff.ms";
+ public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT =
20;
+ public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC =
"The server-side retry backoff when the server attempts" +
+ "to add the partition to the transaction";
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG, INT,
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT, atLeast(0), HIGH,
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC)
+ .define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG, INT,
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT, atLeast(1), HIGH,
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC);
+
+ private final int addPartitionsToTxnRetryBackoffMaxMs;
+ private final int addPartitionsToTxnRetryBackoffMs;
+
+ public AddPartitionsToTxnConfig(AbstractConfig config) {
+ addPartitionsToTxnRetryBackoffMaxMs =
config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG);
+ addPartitionsToTxnRetryBackoffMs =
config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG);
+ }
+ public int addPartitionsToTxnRetryBackoffMaxMs() {
+ return addPartitionsToTxnRetryBackoffMaxMs;
+ }
+ public int addPartitionsToTxnRetryBackoffMs() {
+ return addPartitionsToTxnRetryBackoffMs;
+ }
+}