This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 14626837b36 cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from 
KIP-890 transaction verification (#15755)
14626837b36 is described below

commit 14626837b36c1ae7f757ffa4bcb6d19f8e101701
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Thu Apr 25 10:20:17 2024 -0700

    cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction 
verification (#15755)
    
    * KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction 
verification (#15559)
    
    KIP-890 Part 1 introduced verification of transactions with the
    transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
    This introduced the possibility of new errors when responding to those
    requests. For backwards compatibility with older clients, a choice was
    made to convert some of the new retriable errors to existing errors that
    are expected and retried correctly by older clients.
    
    `NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
    if, for example, the transaction coordinator is temporarily refusing
    connections. Now, we convert it to:
     * `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
       retriable errors that can arise from transaction verification.
     * `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
       error does not force coordinator lookup on clients, unlike
       `COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
       which says that retriable errors should be converted to
       `COORDINATOR_NOT_AVAILABLE`.
    
    Reviewers: Artem Livshits <alivsh...@confluent.io>, David Jacot 
<dja...@confluent.io>, Justine Olshan <jols...@confluent.io>
     Conflicts:
            core/src/main/scala/kafka/server/ReplicaManager.scala
            
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
            
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
    
    There were some conflicts in the how the code path changed. We have three 
paths.
    
    1. Produce -- In appendEntries we have the callback just for produce 
requests. I've included the error and the comment there.
    2. Old Group Coordinator -- In GroupMetadataManager, we handle the 
conversion in maybeConvertOffsetCommitError 3. This path is separate from the 
produce path.
    New Group Coordinator -- Not supported in 3.7 so it is removed
    
    ---------
    
    Co-authored-by: Sean Quah <sq...@confluent.io>
---
 .../scala/kafka/coordinator/group/GroupMetadataManager.scala  |  8 ++++++++
 core/src/main/scala/kafka/server/ReplicaManager.scala         |  6 +++++-
 .../unit/kafka/coordinator/group/GroupCoordinatorTest.scala   |  1 +
 .../src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 11 ++++++++++-
 4 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 31af1d81a22..a21d8fd41d4 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1362,6 +1362,14 @@ object GroupMetadataManager {
 
   def maybeConvertOffsetCommitError(error: Errors) : Errors = {
     error match {
+      case Errors.NETWORK_EXCEPTION =>
+        // When committing offsets transactionally, we now verify the 
transaction with the
+        // transaction coordinator. Verification can fail with 
`NETWORK_EXCEPTION`, a retriable
+        // error which older clients may not expect and retry correctly. We 
translate the error to
+        // `COORDINATOR_LOAD_IN_PROGRESS` because it causes clients to retry 
the request without an
+        // unnecessary coordinator lookup.
+        Errors.COORDINATOR_LOAD_IN_PROGRESS
+
       case Errors.UNKNOWN_TOPIC_OR_PARTITION
            | Errors.NOT_ENOUGH_REPLICAS
            | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e9b386992b1..fa0eb0d8581 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -862,13 +862,17 @@ class ReplicaManager(val config: KafkaConfig,
 
     val errorResults = (unverifiedEntries ++ errorsPerPartition).map {
       case (topicPartition, error) =>
-        // translate transaction coordinator errors to known producer response 
errors
+        // Transaction verification can fail with a retriable error that older 
clients may not
+        // retry correctly. Translate these to an error which will cause such 
clients to retry
+        // the produce request. We pick `NOT_ENOUGH_REPLICAS` because it does 
not trigger a
+        // metadata refresh.
         val customException =
           error match {
             case Errors.INVALID_TXN_STATE => Some(error.exception("Partition 
was not added to the transaction"))
             case Errors.CONCURRENT_TRANSACTIONS |
                  Errors.COORDINATOR_LOAD_IN_PROGRESS |
                  Errors.COORDINATOR_NOT_AVAILABLE |
+                 Errors.NETWORK_EXCEPTION |
                  Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
               s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
             case _ => None
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2b6e226ec64..f88e5297991 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -3803,6 +3803,7 @@ class GroupCoordinatorTest {
 
     verifyErrors(Errors.INVALID_PRODUCER_ID_MAPPING, 
Errors.INVALID_PRODUCER_ID_MAPPING)
     verifyErrors(Errors.INVALID_TXN_STATE, Errors.INVALID_TXN_STATE)
+    verifyErrors(Errors.NETWORK_EXCEPTION, Errors.COORDINATOR_LOAD_IN_PROGRESS)
     verifyErrors(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
     verifyErrors(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
     verifyErrors(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_COORDINATOR)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 2f67e9c8edb..1c6f2d21e85 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2524,7 +2524,16 @@ class ReplicaManagerTest {
   }
 
   @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
+  @EnumSource(
+    value = classOf[Errors],
+    names = Array(
+      "NOT_COORDINATOR",
+      "CONCURRENT_TRANSACTIONS",
+      "NETWORK_EXCEPTION",
+      "COORDINATOR_LOAD_IN_PROGRESS",
+      "COORDINATOR_NOT_AVAILABLE"
+    )
+  )
   def testVerificationErrorConversions(error: Errors): Unit = {
     val tp0 = new TopicPartition(topic, 0)
     val producerId = 24L

Reply via email to