This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b03c9966a2d MINOR: Add client response auth and version exp handling. 
(#20884)
b03c9966a2d is described below

commit b03c9966a2dffcb410000273eff5b96d62a57164
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Nov 15 01:23:12 2025 +0530

    MINOR: Add client response auth and version exp handling. (#20884)
    
    * UnsupportedVersion and AuthenticationException were not explicitly
    handled in `PersisterStateManger`.
    * The issue to addressed in the PR.
    * Comprehensive unit tests have been added.
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  2 +
 .../share/persister/PersisterStateManager.java     | 24 ++++++--
 .../share/persister/PersisterStateManagerTest.java | 71 ++++++++++++++++------
 3 files changed, 74 insertions(+), 23 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 25fa59930a8..73b6a863aa3 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2680,6 +2680,8 @@ public class SharePartition {
                 new UnknownTopicOrPartitionException(errorMessage);
             case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH ->
                 new NotLeaderOrFollowerException(errorMessage);
+            case SASL_AUTHENTICATION_FAILED, UNSUPPORTED_VERSION ->
+                new UnknownServerException("Unable to complete operation due 
to server error.");
             default ->
                 new UnknownServerException(errorMessage);
         };
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 3935ef0dd90..9d2b93f93c8 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -23,7 +23,9 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
@@ -385,12 +387,12 @@ public class PersisterStateManager {
             }
 
             if (isFindCoordinatorResponse(response)) {
-                Optional<Errors> err = checkNetworkError(response, 
this::findCoordinatorErrorResponse);
+                Optional<Errors> err = checkResponseError(response, 
this::findCoordinatorErrorResponse);
                 if (err.isEmpty()) {
                     handleFindCoordinatorResponse(response);
                 }
             } else if (isResponseForRequest(response)) {
-                Optional<Errors> err = checkNetworkError(response, 
this::requestErrorResponse);
+                Optional<Errors> err = checkResponseError(response, 
this::requestErrorResponse);
                 if (err.isEmpty()) {
                     handleRequestResponse(response);
                 }
@@ -399,14 +401,24 @@ public class PersisterStateManager {
         }
 
         // Visibility for testing
-        Optional<Errors> checkNetworkError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
+        Optional<Errors> checkResponseError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
             if (response.hasResponse()) {
                 return Optional.empty();
             }
 
-            log.debug("Response for RPC {} with key {} is invalid - {}.", 
name(), this.partitionKey, response);
-
-            if (response.wasDisconnected()) {
+            log.debug("Response for RPC {} with key {} is invalid - {}", 
name(), this.partitionKey, response);
+
+            if (response.authenticationException() != null) {
+                log.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                errorConsumer.accept(error, new 
AuthenticationException(String.format("Server response for %s indicates 
authentication exception.", this.partitionKey)));
+                return Optional.of(error);
+            } else if (response.versionMismatch() != null) {
+                log.error("Version mismatch exception", 
response.versionMismatch());
+                Errors error = Errors.forException(response.versionMismatch());
+                errorConsumer.accept(error, new 
UnsupportedVersionException(String.format("Server response for %s indicates 
version mismatch.", this.partitionKey)));
+                return Optional.of(error);
+            } else if (response.wasDisconnected()) {
                 errorConsumer.accept(Errors.NETWORK_EXCEPTION, new 
NetworkException(String.format("Server response for %s indicates disconnect.", 
this.partitionKey)));
                 return Optional.of(Errors.NETWORK_EXCEPTION);
             } else if (response.wasTimedOut()) {
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index cbbfddac542..bdce1713817 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
@@ -4502,35 +4504,68 @@ class PersisterStateManagerTest {
         boolean hasResponse;
         boolean wasDisconnected;
         boolean wasTimedOut;
+        boolean authException;
+        boolean versionMismatch;
         Optional<Errors> exp;
 
-        TestHolder(boolean hasResponse, boolean wasDisconnected, boolean 
wasTimedOut, Optional<Errors> exp) {
+        TestHolder(boolean hasResponse, boolean wasDisconnected, boolean 
wasTimedOut, boolean isAuthExp, boolean isVersionMismatch, Optional<Errors> 
exp) {
             this.hasResponse = hasResponse;
             this.wasDisconnected = wasDisconnected;
             this.wasTimedOut = wasTimedOut;
+            this.authException = isAuthExp;
+            this.versionMismatch = isVersionMismatch;
             this.exp = exp;
         }
+
+        @Override
+        public String toString() {
+            return "(" +
+                "hasResponse:" + hasResponse + ", " +
+                "wasDisconnected:" + wasDisconnected + ", " +
+                "wasTimedOut:" + wasTimedOut + ", " +
+                "authException:" + authException + ", " +
+                "versionMismatch:" + versionMismatch + ", " +
+                "expErr:" + (exp.isPresent() ? exp.get() : "<empty>") +
+                ")";
+        }
     }
 
     private static Stream<TestHolder> generatorDifferentStates() {
-        return Stream.of(
-            // Let the actual handler handle since response present.
-            new TestHolder(true, false, false, Optional.empty()),
-            new TestHolder(true, true, true, Optional.empty()),
-            new TestHolder(true, false, true, Optional.empty()),
-            new TestHolder(true, true, false, Optional.empty()),
-
-            // Handled by checkNetworkError.
-            new TestHolder(false, true, false, 
Optional.of(Errors.NETWORK_EXCEPTION)),
-            new TestHolder(false, false, true, 
Optional.of(Errors.REQUEST_TIMED_OUT)),
-            new TestHolder(false, true, true, 
Optional.of(Errors.NETWORK_EXCEPTION)),   // takes precedence
-            new TestHolder(false, false, false, 
Optional.of(Errors.UNKNOWN_SERVER_ERROR))
-        );
+        int combs = 1 << 4; // 2^4 combinations
+        List<TestHolder> holders = new ArrayList<>();
+
+        // Let the actual handler handle since response present.
+        for (int i = 1; i <= combs; i++) {
+            holders.add(new TestHolder(true, (i & 1) != 0, (i & 2) != 0, (i & 
4) != 0, (i & 8) != 0, Optional.empty()));
+        }
+
+        // Handled by checkResponseError.
+        for (int i = 1; i <= combs; i++) {
+            boolean disconnect = (i & 1) != 0;
+            boolean timedOut = (i & 2) != 0;
+            boolean authException = (i & 4) != 0;
+            boolean versionMismatch = (i & 8) != 0;
+
+            Optional<Errors> err = Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+            if (authException) {
+                err = Optional.of(Errors.SASL_AUTHENTICATION_FAILED);
+            } else if (versionMismatch) {
+                err = Optional.of(Errors.UNSUPPORTED_VERSION);
+            } else if (disconnect) {
+                err = Optional.of(Errors.NETWORK_EXCEPTION);
+            } else if (timedOut) {
+                err = Optional.of(Errors.REQUEST_TIMED_OUT);
+            }
+
+            holders.add(new TestHolder(false, disconnect, timedOut, 
authException, versionMismatch, err));
+        }
+
+        return holders.stream();
     }
 
     @ParameterizedTest
     @MethodSource("generatorDifferentStates")
-    public void testNetworkErrorHandling(TestHolder holder) {
+    public void testResponseErrorHandling(TestHolder holder) {
         KafkaClient client = mock(KafkaClient.class);
         Timer timer = mock(Timer.class);
         PersisterStateManager psm = PersisterStateManagerBuilder
@@ -4563,7 +4598,9 @@ class PersisterStateManagerTest {
         when(response.hasResponse()).thenReturn(holder.hasResponse);
         when(response.wasDisconnected()).thenReturn(holder.wasDisconnected);
         when(response.wasTimedOut()).thenReturn(holder.wasTimedOut);
-        assertEquals(holder.exp, handler.checkNetworkError(response, (err, 
exp) -> {
-        }));
+        
when(response.authenticationException()).thenReturn(holder.authException ? new 
SaslAuthenticationException("bad stuff") : null);
+        when(response.versionMismatch()).thenReturn(holder.versionMismatch ? 
new UnsupportedVersionException("worse stuff") : null);
+        assertEquals(holder.exp, handler.checkResponseError(response, (err, 
exp) -> {
+        }), holder.toString());
     }
 }

Reply via email to