This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 14626837b36 cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15755) 14626837b36 is described below commit 14626837b36c1ae7f757ffa4bcb6d19f8e101701 Author: Justine Olshan <jols...@confluent.io> AuthorDate: Thu Apr 25 10:20:17 2024 -0700 cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15755) * KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15559) KIP-890 Part 1 introduced verification of transactions with the transaction coordinator on the `Produce` and `TxnOffsetCommit` paths. This introduced the possibility of new errors when responding to those requests. For backwards compatibility with older clients, a choice was made to convert some of the new retriable errors to existing errors that are expected and retried correctly by older clients. `NETWORK_EXCEPTION` was forgotten about and not converted, but can occur if, for example, the transaction coordinator is temporarily refusing connections. Now, we convert it to: * `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other retriable errors that can arise from transaction verification. * `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This error does not force coordinator lookup on clients, unlike `COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890, which says that retriable errors should be converted to `COORDINATOR_NOT_AVAILABLE`. Reviewers: Artem Livshits <alivsh...@confluent.io>, David Jacot <dja...@confluent.io>, Justine Olshan <jols...@confluent.io> Conflicts: core/src/main/scala/kafka/server/ReplicaManager.scala group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java There were some conflicts in the how the code path changed. We have three paths. 1. Produce -- In appendEntries we have the callback just for produce requests. I've included the error and the comment there. 2. Old Group Coordinator -- In GroupMetadataManager, we handle the conversion in maybeConvertOffsetCommitError 3. This path is separate from the produce path. New Group Coordinator -- Not supported in 3.7 so it is removed --------- Co-authored-by: Sean Quah <sq...@confluent.io> --- .../scala/kafka/coordinator/group/GroupMetadataManager.scala | 8 ++++++++ core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++++- .../unit/kafka/coordinator/group/GroupCoordinatorTest.scala | 1 + .../src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 11 ++++++++++- 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 31af1d81a22..a21d8fd41d4 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1362,6 +1362,14 @@ object GroupMetadataManager { def maybeConvertOffsetCommitError(error: Errors) : Errors = { error match { + case Errors.NETWORK_EXCEPTION => + // When committing offsets transactionally, we now verify the transaction with the + // transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a retriable + // error which older clients may not expect and retry correctly. We translate the error to + // `COORDINATOR_LOAD_IN_PROGRESS` because it causes clients to retry the request without an + // unnecessary coordinator lookup. + Errors.COORDINATOR_LOAD_IN_PROGRESS + case Errors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e9b386992b1..fa0eb0d8581 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -862,13 +862,17 @@ class ReplicaManager(val config: KafkaConfig, val errorResults = (unverifiedEntries ++ errorsPerPartition).map { case (topicPartition, error) => - // translate transaction coordinator errors to known producer response errors + // Transaction verification can fail with a retriable error that older clients may not + // retry correctly. Translate these to an error which will cause such clients to retry + // the produce request. We pick `NOT_ENOUGH_REPLICAS` because it does not trigger a + // metadata refresh. val customException = error match { case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) case Errors.CONCURRENT_TRANSACTIONS | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NETWORK_EXCEPTION | Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) case _ => None diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2b6e226ec64..f88e5297991 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -3803,6 +3803,7 @@ class GroupCoordinatorTest { verifyErrors(Errors.INVALID_PRODUCER_ID_MAPPING, Errors.INVALID_PRODUCER_ID_MAPPING) verifyErrors(Errors.INVALID_TXN_STATE, Errors.INVALID_TXN_STATE) + verifyErrors(Errors.NETWORK_EXCEPTION, Errors.COORDINATOR_LOAD_IN_PROGRESS) verifyErrors(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE) verifyErrors(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR) verifyErrors(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_COORDINATOR) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 2f67e9c8edb..1c6f2d21e85 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2524,7 +2524,16 @@ class ReplicaManagerTest { } @ParameterizedTest - @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE")) + @EnumSource( + value = classOf[Errors], + names = Array( + "NOT_COORDINATOR", + "CONCURRENT_TRANSACTIONS", + "NETWORK_EXCEPTION", + "COORDINATOR_LOAD_IN_PROGRESS", + "COORDINATOR_NOT_AVAILABLE" + ) + ) def testVerificationErrorConversions(error: Errors): Unit = { val tp0 = new TopicPartition(topic, 0) val producerId = 24L