This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 720f4f446d0 KAFKA-18654[2/2]: Transction V2 retry add partitions on 
the server side when handling produce request. (#18810)
720f4f446d0 is described below

commit 720f4f446d0cd26c71c9538e0f49df25b3446308
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Feb 13 09:30:58 2025 -0800

    KAFKA-18654[2/2]: Transction V2 retry add partitions on the server side 
when handling produce request. (#18810)
    
    During the transaction commit phase, it is normal to hit 
CONCURRENT_TRANSACTION error before the transaction markers are fully 
propagated. Instead of letting the client to retry the produce request, it is 
better to retry on the server side.
    
    Reviewers: Artem Livshits <[email protected]>, Justine Olshan 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 45 ++++++++++--
 .../unit/kafka/server/ReplicaManagerTest.scala     | 81 +++++++++++++++++++++-
 .../kafka/server/config/AbstractKafkaConfig.java   |  4 +-
 .../transaction/AddPartitionsToTxnConfig.java      | 56 +++++++++++++++
 5 files changed, 178 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6698d9eb21f..17b8cefb1ee 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig, TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -206,8 +206,10 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
   private val _transactionLogConfig = new TransactionLogConfig(this)
   private val _transactionStateManagerConfig = new 
TransactionStateManagerConfig(this)
+  private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
   def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
   def transactionStateManagerConfig: TransactionStateManagerConfig = 
_transactionStateManagerConfig
+  def addPartitionsToTxnConfig: AddPartitionsToTxnConfig = 
_addPartitionsToTxnConfig
 
   private val _quotaConfig = new QuotaConfig(this)
   def quotaConfig: QuotaConfig = _quotaConfig
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4cb9ce35f64..e50b5a500ab 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -790,18 +790,49 @@ class ReplicaManager(val config: KafkaConfig,
       return
     }
 
+    // Wrap the callback to be handled on an arbitrary request handler thread
+    // when transaction verification is complete. The request local passed in
+    // is only used when the callback is executed immediately.
+    val wrappedPostVerificationCallback = 
KafkaRequestHandler.wrapAsyncCallback(
+      postVerificationCallback,
+      requestLocal
+    )
+
+    val retryTimeoutMs = 
Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), 
config.requestTimeoutMs)
+    val addPartitionsRetryBackoffMs = 
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
+    val startVerificationTimeMs = time.milliseconds
+    def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, 
Errors], Map[TopicPartition, VerificationGuard])): Unit = {
+      if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
+        // We've exceeded the retry timeout, so just call the callback with 
whatever results we have
+        wrappedPostVerificationCallback(results)
+      } else if (results._1.values.exists(_ == 
Errors.CONCURRENT_TRANSACTIONS)) {
+        // Retry the verification with backoff
+        scheduler.scheduleOnce("retry-add-partitions-to-txn", () => {
+          maybeSendPartitionsToTransactionCoordinator(
+            topicPartitionBatchInfo,
+            transactionalId,
+            transactionalProducerInfo.head._1,
+            transactionalProducerInfo.head._2,
+            maybeRetryOnConcurrentTransactions,
+            transactionSupportedOperation
+          )
+        }, addPartitionsRetryBackoffMs * 1L)
+      } else {
+        // We don't have concurrent transaction errors, so just call the 
callback with the results
+        wrappedPostVerificationCallback(results)
+      }
+    }
+
     maybeSendPartitionsToTransactionCoordinator(
       topicPartitionBatchInfo,
       transactionalId,
       transactionalProducerInfo.head._1,
       transactionalProducerInfo.head._2,
-      // Wrap the callback to be handled on an arbitrary request handler thread
-      // when transaction verification is complete. The request local passed in
-      // is only used when the callback is executed immediately.
-      KafkaRequestHandler.wrapAsyncCallback(
-        postVerificationCallback,
-        requestLocal
-      ),
+      // If we add partition directly from produce request,
+      // we should retry on concurrent transaction error here because:
+      //  - the produce backoff adds too much delay
+      //  - the produce request is expensive to retry
+      if (transactionSupportedOperation.supportsEpochBump) 
maybeRetryOnConcurrentTransactions else wrappedPostVerificationCallback,
       transactionSupportedOperation
     )
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 63df3bfc8ee..def3a483c8a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -63,7 +63,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.server.util.timer.MockTimer
-import org.apache.kafka.server.util.{MockScheduler, MockTime}
+import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, 
LogOffsetSnapshot, LogSegments, ProducerStateManager, 
ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
@@ -2249,6 +2249,80 @@ class ReplicaManagerTest {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(
+    value = classOf[Errors],
+    names = Array(
+      "NOT_COORDINATOR",
+      "CONCURRENT_TRANSACTIONS"
+    )
+  )
+  def testTransactionAddPartitionRetry(error: Errors): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 6
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+    val scheduler = new MockScheduler(time)
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), scheduler = scheduler)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new 
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
+        (_, _) => ())
+
+      // Append some transactional records.
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      // We should add these partitions to the manager to verify.
+      val result = handleProduceAppend(replicaManager, tp0, 
transactionalRecords, origin = AppendOrigin.CLIENT,
+        transactionalId = transactionalId, transactionSupportedOperation = 
addPartition)
+      val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+      verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
+        ArgumentMatchers.eq(transactionalId),
+        ArgumentMatchers.eq(producerId),
+        ArgumentMatchers.eq(producerEpoch),
+        ArgumentMatchers.eq(Seq(tp0)),
+        appendCallback.capture(),
+        any()
+      )
+      val verificationGuard = getVerificationGuard(replicaManager, tp0, 
producerId)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
+
+      // Confirm we did not write to the log and instead returned error.
+      var callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+      callback(Map(tp0 -> error).toMap)
+
+      if (error != Errors.CONCURRENT_TRANSACTIONS) {
+        // NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      } else {
+        // The append should not finish with error, it should retry later.
+        assertFalse(result.hasFired)
+        assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
+
+        
time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1)
+        scheduler.tick()
+
+        verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction(
+          ArgumentMatchers.eq(transactionalId),
+          ArgumentMatchers.eq(producerId),
+          ArgumentMatchers.eq(producerEpoch),
+          ArgumentMatchers.eq(Seq(tp0)),
+          appendCallback.capture(),
+          any()
+        )
+        callback = appendCallback.getValue()
+        callback(Map.empty[TopicPartition, Errors].toMap)
+        assertEquals(VerificationGuard.SENTINEL, 
getVerificationGuard(replicaManager, tp0, producerId))
+        
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, 
producerEpoch))
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   @Test
   def testTransactionVerificationBlocksOutOfOrderSequence(): Unit = {
     val tp0 = new TopicPartition(topic, 0)
@@ -3117,7 +3191,8 @@ class ReplicaManagerTest {
 
   private def 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager:
 AddPartitionsToTxnManager,
                                                                      
transactionalTopicPartitions: List[TopicPartition],
-                                                                     config: 
KafkaConfig = config): ReplicaManager = {
+                                                                     config: 
KafkaConfig = config,
+                                                                     
scheduler: Scheduler = new MockScheduler(time)): ReplicaManager = {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
     val metadataCache = mock(classOf[MetadataCache])
 
@@ -3125,7 +3200,7 @@ class ReplicaManagerTest {
       metrics = metrics,
       config = config,
       time = time,
-      scheduler = new MockScheduler(time),
+      scheduler = scheduler,
       logManager = mockLogMgr,
       quotaManagers = quotaManager,
       metadataCache = metadataCache,
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 7b01aa5813b..918534fce7a 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.AddPartitionsToTxnConfig;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
 import org.apache.kafka.network.SocketServerConfigs;
@@ -63,7 +64,8 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         MetricConfigs.CONFIG_DEF,
         QuotaConfig.CONFIG_DEF,
         BrokerSecurityConfigs.CONFIG_DEF,
-        DelegationTokenManagerConfigs.CONFIG_DEF
+        DelegationTokenManagerConfigs.CONFIG_DEF,
+        AddPartitionsToTxnConfig.CONFIG_DEF
     ));
 
     public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals, 
Map<String, ?> configProviderProps, boolean doLog) {
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
new file mode 100644
index 00000000000..951bebddf06
--- /dev/null
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.transaction;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+public final class AddPartitionsToTxnConfig {
+    // The default config values for the server-side add partition to 
transaction operations.
+    public static final String 
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG = 
"add.partitions.to.txn.retry.backoff.max.ms";
+    public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT 
= 100;
+    public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC 
= "The maximum allowed timeout for adding " +
+            "partitions to transactions on the server side. It only applies to 
the actual add partition operations, " +
+            "not the verification. It will not be effective if it is larger 
than request.timeout.ms";
+    public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG = 
"add.partitions.to.txn.retry.backoff.ms";
+    public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT = 
20;
+    public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC = 
"The server-side retry backoff when the server attempts" +
+        "to add the partition to the transaction";
+
+    public static final ConfigDef CONFIG_DEF =  new ConfigDef()
+        .define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG, INT, 
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT, atLeast(0), HIGH, 
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC)
+        .define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG, INT, 
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT, atLeast(1), HIGH, 
ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC);
+
+    private final int addPartitionsToTxnRetryBackoffMaxMs;
+    private final int addPartitionsToTxnRetryBackoffMs;
+
+    public AddPartitionsToTxnConfig(AbstractConfig config) {
+        addPartitionsToTxnRetryBackoffMaxMs = 
config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG);
+        addPartitionsToTxnRetryBackoffMs = 
config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG);
+    }
+    public int addPartitionsToTxnRetryBackoffMaxMs() {
+        return addPartitionsToTxnRetryBackoffMaxMs;
+    }
+    public int addPartitionsToTxnRetryBackoffMs() {
+        return addPartitionsToTxnRetryBackoffMs;
+    }
+}

Reply via email to