This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b03c9966a2d MINOR: Add client response auth and version exp handling.
(#20884)
b03c9966a2d is described below
commit b03c9966a2dffcb410000273eff5b96d62a57164
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Nov 15 01:23:12 2025 +0530
MINOR: Add client response auth and version exp handling. (#20884)
* UnsupportedVersion and AuthenticationException were not explicitly
handled in `PersisterStateManger`.
* The issue to addressed in the PR.
* Comprehensive unit tests have been added.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 2 +
.../share/persister/PersisterStateManager.java | 24 ++++++--
.../share/persister/PersisterStateManagerTest.java | 71 ++++++++++++++++------
3 files changed, 74 insertions(+), 23 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 25fa59930a8..73b6a863aa3 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2680,6 +2680,8 @@ public class SharePartition {
new UnknownTopicOrPartitionException(errorMessage);
case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH ->
new NotLeaderOrFollowerException(errorMessage);
+ case SASL_AUTHENTICATION_FAILED, UNSUPPORTED_VERSION ->
+ new UnknownServerException("Unable to complete operation due
to server error.");
default ->
new UnknownServerException(errorMessage);
};
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 3935ef0dd90..9d2b93f93c8 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -23,7 +23,9 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
@@ -385,12 +387,12 @@ public class PersisterStateManager {
}
if (isFindCoordinatorResponse(response)) {
- Optional<Errors> err = checkNetworkError(response,
this::findCoordinatorErrorResponse);
+ Optional<Errors> err = checkResponseError(response,
this::findCoordinatorErrorResponse);
if (err.isEmpty()) {
handleFindCoordinatorResponse(response);
}
} else if (isResponseForRequest(response)) {
- Optional<Errors> err = checkNetworkError(response,
this::requestErrorResponse);
+ Optional<Errors> err = checkResponseError(response,
this::requestErrorResponse);
if (err.isEmpty()) {
handleRequestResponse(response);
}
@@ -399,14 +401,24 @@ public class PersisterStateManager {
}
// Visibility for testing
- Optional<Errors> checkNetworkError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
+ Optional<Errors> checkResponseError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
if (response.hasResponse()) {
return Optional.empty();
}
- log.debug("Response for RPC {} with key {} is invalid - {}.",
name(), this.partitionKey, response);
-
- if (response.wasDisconnected()) {
+ log.debug("Response for RPC {} with key {} is invalid - {}",
name(), this.partitionKey, response);
+
+ if (response.authenticationException() != null) {
+ log.error("Authentication exception",
response.authenticationException());
+ Errors error =
Errors.forException(response.authenticationException());
+ errorConsumer.accept(error, new
AuthenticationException(String.format("Server response for %s indicates
authentication exception.", this.partitionKey)));
+ return Optional.of(error);
+ } else if (response.versionMismatch() != null) {
+ log.error("Version mismatch exception",
response.versionMismatch());
+ Errors error = Errors.forException(response.versionMismatch());
+ errorConsumer.accept(error, new
UnsupportedVersionException(String.format("Server response for %s indicates
version mismatch.", this.partitionKey)));
+ return Optional.of(error);
+ } else if (response.wasDisconnected()) {
errorConsumer.accept(Errors.NETWORK_EXCEPTION, new
NetworkException(String.format("Server response for %s indicates disconnect.",
this.partitionKey)));
return Optional.of(Errors.NETWORK_EXCEPTION);
} else if (response.wasTimedOut()) {
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index cbbfddac542..bdce1713817 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
@@ -4502,35 +4504,68 @@ class PersisterStateManagerTest {
boolean hasResponse;
boolean wasDisconnected;
boolean wasTimedOut;
+ boolean authException;
+ boolean versionMismatch;
Optional<Errors> exp;
- TestHolder(boolean hasResponse, boolean wasDisconnected, boolean
wasTimedOut, Optional<Errors> exp) {
+ TestHolder(boolean hasResponse, boolean wasDisconnected, boolean
wasTimedOut, boolean isAuthExp, boolean isVersionMismatch, Optional<Errors>
exp) {
this.hasResponse = hasResponse;
this.wasDisconnected = wasDisconnected;
this.wasTimedOut = wasTimedOut;
+ this.authException = isAuthExp;
+ this.versionMismatch = isVersionMismatch;
this.exp = exp;
}
+
+ @Override
+ public String toString() {
+ return "(" +
+ "hasResponse:" + hasResponse + ", " +
+ "wasDisconnected:" + wasDisconnected + ", " +
+ "wasTimedOut:" + wasTimedOut + ", " +
+ "authException:" + authException + ", " +
+ "versionMismatch:" + versionMismatch + ", " +
+ "expErr:" + (exp.isPresent() ? exp.get() : "<empty>") +
+ ")";
+ }
}
private static Stream<TestHolder> generatorDifferentStates() {
- return Stream.of(
- // Let the actual handler handle since response present.
- new TestHolder(true, false, false, Optional.empty()),
- new TestHolder(true, true, true, Optional.empty()),
- new TestHolder(true, false, true, Optional.empty()),
- new TestHolder(true, true, false, Optional.empty()),
-
- // Handled by checkNetworkError.
- new TestHolder(false, true, false,
Optional.of(Errors.NETWORK_EXCEPTION)),
- new TestHolder(false, false, true,
Optional.of(Errors.REQUEST_TIMED_OUT)),
- new TestHolder(false, true, true,
Optional.of(Errors.NETWORK_EXCEPTION)), // takes precedence
- new TestHolder(false, false, false,
Optional.of(Errors.UNKNOWN_SERVER_ERROR))
- );
+ int combs = 1 << 4; // 2^4 combinations
+ List<TestHolder> holders = new ArrayList<>();
+
+ // Let the actual handler handle since response present.
+ for (int i = 1; i <= combs; i++) {
+ holders.add(new TestHolder(true, (i & 1) != 0, (i & 2) != 0, (i &
4) != 0, (i & 8) != 0, Optional.empty()));
+ }
+
+ // Handled by checkResponseError.
+ for (int i = 1; i <= combs; i++) {
+ boolean disconnect = (i & 1) != 0;
+ boolean timedOut = (i & 2) != 0;
+ boolean authException = (i & 4) != 0;
+ boolean versionMismatch = (i & 8) != 0;
+
+ Optional<Errors> err = Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+ if (authException) {
+ err = Optional.of(Errors.SASL_AUTHENTICATION_FAILED);
+ } else if (versionMismatch) {
+ err = Optional.of(Errors.UNSUPPORTED_VERSION);
+ } else if (disconnect) {
+ err = Optional.of(Errors.NETWORK_EXCEPTION);
+ } else if (timedOut) {
+ err = Optional.of(Errors.REQUEST_TIMED_OUT);
+ }
+
+ holders.add(new TestHolder(false, disconnect, timedOut,
authException, versionMismatch, err));
+ }
+
+ return holders.stream();
}
@ParameterizedTest
@MethodSource("generatorDifferentStates")
- public void testNetworkErrorHandling(TestHolder holder) {
+ public void testResponseErrorHandling(TestHolder holder) {
KafkaClient client = mock(KafkaClient.class);
Timer timer = mock(Timer.class);
PersisterStateManager psm = PersisterStateManagerBuilder
@@ -4563,7 +4598,9 @@ class PersisterStateManagerTest {
when(response.hasResponse()).thenReturn(holder.hasResponse);
when(response.wasDisconnected()).thenReturn(holder.wasDisconnected);
when(response.wasTimedOut()).thenReturn(holder.wasTimedOut);
- assertEquals(holder.exp, handler.checkNetworkError(response, (err,
exp) -> {
- }));
+
when(response.authenticationException()).thenReturn(holder.authException ? new
SaslAuthenticationException("bad stuff") : null);
+ when(response.versionMismatch()).thenReturn(holder.versionMismatch ?
new UnsupportedVersionException("worse stuff") : null);
+ assertEquals(holder.exp, handler.checkResponseError(response, (err,
exp) -> {
+ }), holder.toString());
}
}