This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new ff65f765d4a KAFKA-19990 gracefully handle exceptions when handling 
AllocateProducerIdsResponse (#21135)
ff65f765d4a is described below

commit ff65f765d4a03f72e19625f358e724b8106a6f1e
Author: Gaurav Narula <[email protected]>
AuthorDate: Wed Dec 17 16:43:32 2025 +0000

    KAFKA-19990 gracefully handle exceptions when handling 
AllocateProducerIdsResponse (#21135)
    
    The handler in `RPCProducerIdManager` doesn't handle authentication
    exception and version mismatch exceptions gracefully.
    
    This change ensures we retry on such failures and adds unit tests for
    these scenarios.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../transaction/RPCProducerIdManager.java          |  37 +++++--
 .../transaction/ProducerIdManagerTest.java         | 110 ++++++++++++++++++---
 2 files changed, 122 insertions(+), 25 deletions(-)

diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
index 32e43880ac1..e4e3772de60 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
@@ -51,7 +51,8 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
     private final String logPrefix;
 
     private final int brokerId;
-    private final Time time;
+    // Visible for testing
+    final Time time;
     private final Supplier<Long> brokerEpochSupplier;
     private final NodeToControllerChannelManager controllerChannel;
 
@@ -129,9 +130,7 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
 
             @Override
             public void onComplete(ClientResponse response) {
-                if (response.responseBody() instanceof 
AllocateProducerIdsResponse) {
-                    
handleAllocateProducerIdsResponse((AllocateProducerIdsResponse) 
response.responseBody());
-                }
+                handleAllocateProducerIdsResponse(response);
             }
 
             @Override
@@ -142,7 +141,30 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
         });
     }
 
-    protected void 
handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
+    private void handleUnsuccessfulResponse() {
+        // There is no need to compare and set because only one thread
+        // handles the AllocateProducerIds response.
+        backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
+        requestInFlight.set(false);
+    }
+
+    protected void handleAllocateProducerIdsResponse(ClientResponse 
clientResponse) {
+        if (clientResponse.authenticationException() != null) {
+            log.error("{} Unable to allocate producer id because of an 
authentication exception", logPrefix, clientResponse.authenticationException());
+            handleUnsuccessfulResponse();
+            return;
+        }
+        if (clientResponse.versionMismatch() != null) {
+            log.error("{} Unable to allocate producer id because of a version 
mismatch exception", logPrefix, clientResponse.versionMismatch());
+            handleUnsuccessfulResponse();
+            return;
+        }
+        if (!clientResponse.hasResponse()) {
+            log.error("{} Unable to allocate producer id because of empty 
response from controller", logPrefix);
+            handleUnsuccessfulResponse();
+            return;
+        }
+        AllocateProducerIdsResponse response = (AllocateProducerIdsResponse) 
clientResponse.responseBody();
         var data = response.data();
         var successfulResponse = false;
         var errors = Errors.forCode(data.errorCode());
@@ -161,10 +183,7 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
                 log.error("{} Received error code {} from the controller.", 
logPrefix, errors);
         }
         if (!successfulResponse) {
-            // There is no need to compare and set because only one thread
-            // handles the AllocateProducerIds response.
-            backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
-            requestInFlight.set(false);
+            handleUnsuccessfulResponse();
         }
     }
 
diff --git 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
index 9cc328dc6bc..35f3fc9edc4 100644
--- 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
+++ 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
@@ -61,43 +64,82 @@ public class ProducerIdManagerTest {
         private final ExecutorService brokerToControllerRequestExecutor = 
Executors.newSingleThreadExecutor();
         private final int idLen;
         private Long idStart;
+        private boolean hasAuthenticationException;
+        private boolean hasVersionMismatch;
+        private boolean hasNoResponse;
 
         MockProducerIdManager(int brokerId,
                               long idStart,
                               int idLen,
                               Queue<Errors> errorQueue,
                               boolean isErroneousBlock,
-                              Time time) {
+                              Time time,
+                              boolean hasAuthenticationException,
+                              boolean hasVersionMismatch,
+                              boolean hasNoResponse) {
             super(brokerId, time, () -> 1L, brokerToController);
             this.idStart = idStart;
             this.idLen = idLen;
             this.errorQueue = errorQueue;
             this.isErroneousBlock = isErroneousBlock;
+            this.hasAuthenticationException = hasAuthenticationException;
+            this.hasVersionMismatch = hasVersionMismatch;
+            this.hasNoResponse = hasNoResponse;
+        }
+
+        private ClientResponse createClientResponse(
+                AuthenticationException authenticationException,
+                UnsupportedVersionException versionException,
+                AllocateProducerIdsResponse response
+        ) {
+            return new ClientResponse(null, null, null, time.milliseconds(), 
time.milliseconds(),
+                    false, versionException, authenticationException, 
response);
         }
 
         @Override
         protected void sendRequest() {
             brokerToControllerRequestExecutor.submit(() -> {
+                if (hasAuthenticationException) {
+                    handleAllocateProducerIdsResponse(createClientResponse(new 
AuthenticationException("Auth Failure"), null, null));
+                    hasAuthenticationException = false; // reset so retry works
+                    return;
+                }
+                if (hasVersionMismatch) {
+                    
handleAllocateProducerIdsResponse(createClientResponse(null, new 
UnsupportedVersionException("Version Mismatch"), null));
+                    hasVersionMismatch = false; // reset so retry works
+                    return;
+                }
+                if (hasNoResponse) {
+                    
handleAllocateProducerIdsResponse(createClientResponse(null, null, null));
+                    hasNoResponse = false; // reset so retry works
+                    return;
+                }
                 Errors error = errorQueue.poll();
                 if (error == null || error == Errors.NONE) {
-                    handleAllocateProducerIdsResponse(new 
AllocateProducerIdsResponse(
-                            new AllocateProducerIdsResponseData()
-                                    .setProducerIdStart(idStart)
-                                    .setProducerIdLen(idLen)
-                    ));
+                    handleAllocateProducerIdsResponse(createClientResponse(
+                            null,
+                            null,
+                            new AllocateProducerIdsResponse(
+                                    new AllocateProducerIdsResponseData()
+                                            .setProducerIdStart(idStart)
+                                            .setProducerIdLen(idLen)
+                            )));
                     if (!isErroneousBlock) {
                         idStart += idLen;
                     }
                 } else {
-                    handleAllocateProducerIdsResponse(new 
AllocateProducerIdsResponse(
-                            new 
AllocateProducerIdsResponseData().setErrorCode(error.code())
-                    ));
+                    handleAllocateProducerIdsResponse(createClientResponse(
+                            null,
+                            null,
+                            new AllocateProducerIdsResponse(
+                                    new 
AllocateProducerIdsResponseData().setErrorCode(error.code())
+                            )));
                 }
             }, 0);
         }
 
         @Override
-        protected void 
handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
+        protected void handleAllocateProducerIdsResponse(ClientResponse 
response) {
             super.handleAllocateProducerIdsResponse(response);
             capturedFailure.set(nextProducerIdBlock.get() == null);
         }
@@ -112,7 +154,7 @@ public class ProducerIdManagerTest {
         var numThreads = 5;
         var latch = new CountDownLatch(idBlockLen * 3);
         var manager = new MockProducerIdManager(0, 0, idBlockLen,
-                new ConcurrentLinkedQueue<>(), false, Time.SYSTEM);
+                new ConcurrentLinkedQueue<>(), false, Time.SYSTEM, false, 
false, false);
         var requestHandlerThreadPool = 
Executors.newFixedThreadPool(numThreads);
         Map<Long, Integer> pidMap = new ConcurrentHashMap<>();
 
@@ -149,7 +191,7 @@ public class ProducerIdManagerTest {
     @EnumSource(value = Errors.class, names = {"UNKNOWN_SERVER_ERROR", 
"INVALID_REQUEST"})
     public void testUnrecoverableErrors(Errors error) throws Exception {
         var time = new MockTime();
-        var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE, 
error), false, time);
+        var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE, 
error), false, time, false, false, false);
         verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
         verifyFailureWithoutGenerateProducerId(manager);
 
@@ -159,20 +201,56 @@ public class ProducerIdManagerTest {
 
     @Test
     public void testInvalidRanges() throws InterruptedException {
-        var manager = new MockProducerIdManager(0, -1, 10, new 
ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
+        var manager = new MockProducerIdManager(0, -1, 10, new 
ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
         verifyFailure(manager);
 
-        manager = new MockProducerIdManager(0, 0, -1, new 
ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
+        manager = new MockProducerIdManager(0, 0, -1, new 
ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
         verifyFailure(manager);
 
-        manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new 
ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
+        manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new 
ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
         verifyFailure(manager);
     }
 
     @Test
     public void testRetryBackoff() throws Exception {
         var time = new MockTime();
-        var manager = new MockProducerIdManager(0, 0, 1, 
queue(Errors.UNKNOWN_SERVER_ERROR), false, time);
+        var manager = new MockProducerIdManager(0, 0, 1, 
queue(Errors.UNKNOWN_SERVER_ERROR), false, time, false, false, false);
+
+        verifyFailure(manager);
+
+        assertThrows(CoordinatorLoadInProgressException.class, 
manager::generateProducerId);
+        time.sleep(RETRY_BACKOFF_MS);
+        verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
+    }
+
+    @Test
+    public void testRetryBackoffOnAuthException() throws Exception {
+        var time = new MockTime();
+        var manager = new MockProducerIdManager(0, 0, 1, new 
ConcurrentLinkedQueue<>(), false, time, true, false, false);
+
+        verifyFailure(manager);
+
+        assertThrows(CoordinatorLoadInProgressException.class, 
manager::generateProducerId);
+        time.sleep(RETRY_BACKOFF_MS);
+        verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
+    }
+
+    @Test
+    public void testRetryBackoffOnVersionMismatch() throws Exception {
+        var time = new MockTime();
+        var manager = new MockProducerIdManager(0, 0, 1, new 
ConcurrentLinkedQueue<>(), false, time, false, true, false);
+
+        verifyFailure(manager);
+
+        assertThrows(CoordinatorLoadInProgressException.class, 
manager::generateProducerId);
+        time.sleep(RETRY_BACKOFF_MS);
+        verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
+    }
+
+    @Test
+    public void testRetryBackoffOnNoResponse() throws Exception {
+        var time = new MockTime();
+        var manager = new MockProducerIdManager(0, 0, 1, new 
ConcurrentLinkedQueue<>(), false, time, false, false, true);
 
         verifyFailure(manager);
 

Reply via email to