This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a3b34c13151 KAFKA-18662: Return CONCURRENT_TRANSACTIONS on produce
request in TV2 (#18733)
a3b34c13151 is described below
commit a3b34c131517d5aa6c15660a977640802e070d54
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Jan 29 10:15:48 2025 -0800
KAFKA-18662: Return CONCURRENT_TRANSACTIONS on produce request in TV2
(#18733)
While testing, it was found that the not_enough_replicas error was super
common and could be easily confused. Since we are already bumping the request,
we can signify that the produce request may return this error and new clients
can handle it
(Note, the java client should be able to handle this already as a retriable
error, but other client libraries may need to implement this change)
Reviewers: Justine Olshan <[email protected]>
---
.../kafka/common/requests/ProduceResponse.java | 1 +
.../kafka/clients/producer/KafkaProducerTest.java | 59 ++++++++++++++++++++
.../main/scala/kafka/server/ReplicaManager.scala | 12 +++-
.../unit/kafka/server/ReplicaManagerTest.scala | 64 ++++++++++++++++++++--
4 files changed, 129 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 186ad9b80a1..99f7f475ba5 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
* {@link Errors#INVALID_RECORD}
* {@link Errors#INVALID_TXN_STATE}
* {@link Errors#INVALID_PRODUCER_ID_MAPPING}
+ * {@link Errors#CONCURRENT_TRANSACTIONS}
*/
public class ProduceResponse extends AbstractResponse {
public static final long INVALID_OFFSET = -1L;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 0f497b7936d..09662c0a3d6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1401,6 +1401,65 @@ public class KafkaProducerTest {
}
}
+ @Test
+ public void testTransactionV2ProduceWithConcurrentTransactionError()
throws Exception {
+ StringSerializer serializer = new StringSerializer();
+ KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
+
+ String topic = "foo";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ Cluster cluster = TestUtils.singletonCluster(topic, 1);
+
+ when(ctx.sender.isRunning()).thenReturn(true);
+ when(ctx.metadata.fetch()).thenReturn(cluster);
+
+ long timestamp = ctx.time.milliseconds();
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0,
timestamp, "key", "value");
+
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ ProducerConfig config = new ProducerConfig(props);
+
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+ ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+ NodeApiVersions nodeApiVersions = new
NodeApiVersions(NodeApiVersions.create().allSupportedApiVersions().values(),
+ Arrays.asList(new ApiVersionsResponseData.SupportedFeatureKey()
+ .setName("transaction.version")
+ .setMaxVersion((short) 2)
+ .setMinVersion((short) 0)),
+ Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey()
+ .setName("transaction.version")
+ .setMaxVersionLevel((short) 2)
+ .setMinVersionLevel((short) 2)),
+ 0);
+ client.setNodeApiVersions(nodeApiVersions);
+ ApiVersions apiVersions = new ApiVersions();
+ apiVersions.update(NODE.idString(), nodeApiVersions);
+
+ ProducerInterceptors<String, String> interceptor = new
ProducerInterceptors<>(Collections.emptyList());
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"some-txn", NODE));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5,
Errors.NONE));
+ client.prepareResponse(produceResponse(topicPartition, 1L,
Errors.CONCURRENT_TRANSACTIONS, 0, 1));
+ client.prepareResponse(produceResponse(topicPartition, 1L,
Errors.NONE, 0, 1));
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(
+ config, new StringSerializer(), new StringSerializer(), metadata,
client, interceptor, apiVersions, time)
+ ) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.send(record).get();
+ producer.commitTransaction();
+ }
+ }
+
@Test
public void testMeasureAbortTransactionDuration() {
Map<String, Object> configs = new HashMap<>();
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d8e415028ea..87d1ca927bd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -738,12 +738,20 @@ class ReplicaManager(val config: KafkaConfig,
// retry correctly. Translate these to an error which will cause
such clients to retry
// the produce request. We pick `NOT_ENOUGH_REPLICAS` because it
does not trigger a
// metadata refresh.
- case Errors.CONCURRENT_TRANSACTIONS |
- Errors.NETWORK_EXCEPTION |
+ case Errors.NETWORK_EXCEPTION |
Errors.COORDINATOR_LOAD_IN_PROGRESS |
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => Some(new
NotEnoughReplicasException(
s"Unable to verify the partition has been added to the
transaction. Underlying error: ${error.toString}"))
+ case Errors.CONCURRENT_TRANSACTIONS =>
+ if (transactionSupportedOperation != addPartition) {
+ Some(new NotEnoughReplicasException(
+ s"Unable to verify the partition has been added to the
transaction. Underlying error: ${error.toString}"))
+ } else {
+ // Don't convert the Concurrent Transaction exception for
TV2. Because the error is very common during
+ // the transaction commit phase. Returning Concurrent
Transaction is less confusing to the client.
+ None
+ }
case _ => None
}
topicPartition -> LogAppendResult(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 12f4710fede..ba6b6f80a27 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -118,7 +118,6 @@ class ReplicaManagerTest {
private var mockRemoteLogManager: RemoteLogManager = _
private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _
private var brokerTopicStats: BrokerTopicStats = _
- private val transactionSupportedOperation = genericErrorSupported
private val quotaExceededThrottleTime = 1000
private val quotaAvailableThrottleTime = 0
@@ -2516,6 +2515,55 @@ class ReplicaManagerTest {
}
}
+ @ParameterizedTest
+ @EnumSource(
+ value = classOf[Errors],
+ names = Array(
+ "NOT_COORDINATOR",
+ "NETWORK_EXCEPTION",
+ "COORDINATOR_LOAD_IN_PROGRESS",
+ "COORDINATOR_NOT_AVAILABLE"
+ )
+ )
+ def testVerificationErrorConversionsTV2(error: Errors): Unit = {
+ val tp0 = new TopicPartition(topic, 0)
+ val producerId = 24L
+ val producerEpoch = 0.toShort
+ val sequence = 0
+ val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+ val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
List(tp0))
+ try {
+ replicaManager.becomeLeaderOrFollower(1,
+ makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
+ (_, _) => ())
+
+ val transactionalRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId,
producerEpoch, sequence,
+ new SimpleRecord("message".getBytes))
+
+ // Start verification and return the coordinator related errors.
+ val expectedMessage = s"Unable to verify the partition has been added to
the transaction. Underlying error: ${error.toString}"
+ val result = handleProduceAppend(replicaManager, tp0,
transactionalRecords, transactionalId = transactionalId,
transactionSupportedOperation = addPartition)
+ val appendCallback =
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+ verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(producerEpoch),
+ ArgumentMatchers.eq(Seq(tp0)),
+ appendCallback.capture(),
+ any()
+ )
+
+ // Confirm we did not write to the log and instead returned the
converted error with the correct error message.
+ val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue()
+ callback(Map(tp0 -> error).toMap)
+ assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+ assertEquals(expectedMessage, result.assertFired.errorMessage)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
@ParameterizedTest
@EnumSource(
value = classOf[Errors],
@@ -2527,7 +2575,7 @@ class ReplicaManagerTest {
"COORDINATOR_NOT_AVAILABLE"
)
)
- def testVerificationErrorConversions(error: Errors): Unit = {
+ def testVerificationErrorConversionsTV1(error: Errors): Unit = {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
@@ -2869,7 +2917,9 @@ class ReplicaManagerTest {
entriesToAppend:
Map[TopicPartition, MemoryRecords],
transactionalId: String,
origin: AppendOrigin =
AppendOrigin.CLIENT,
- requiredAcks: Short = -1):
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
+ requiredAcks: Short = -1,
+
transactionSupportedOperation: TransactionSupportedOperation =
genericErrorSupported
+ ):
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]()
def appendCallback(responses: Map[TopicPartition, PartitionResponse]):
Unit = {
responses.foreach( response =>
assertTrue(responses.get(response._1).isDefined))
@@ -2894,7 +2944,9 @@ class ReplicaManagerTest {
records: MemoryRecords,
origin: AppendOrigin = AppendOrigin.CLIENT,
requiredAcks: Short = -1,
- transactionalId: String):
CallbackResult[PartitionResponse] = {
+ transactionalId: String,
+ transactionSupportedOperation:
TransactionSupportedOperation = genericErrorSupported
+ ): CallbackResult[PartitionResponse] = {
val result = new CallbackResult[PartitionResponse]()
def appendCallback(responses: Map[TopicPartition, PartitionResponse]):
Unit = {
@@ -2922,7 +2974,9 @@ class ReplicaManagerTest {
transactionalId:
String,
producerId: Long,
producerEpoch:
Short,
- baseSequence: Int
= 0): CallbackResult[Either[Errors, VerificationGuard]] = {
+ baseSequence: Int
= 0,
+
transactionSupportedOperation: TransactionSupportedOperation =
genericErrorSupported
+ ):
CallbackResult[Either[Errors, VerificationGuard]] = {
val result = new CallbackResult[Either[Errors, VerificationGuard]]()
def postVerificationCallback(errorAndGuard: (Errors, VerificationGuard)):
Unit = {
val (error, verificationGuard) = errorAndGuard