This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5dd0153864b KAFKA-19927: Add retry for network and req timeout resp 
errors. (#20998)
5dd0153864b is described below

commit 5dd0153864b0e6df7e0dd7d8d5898949774480cd
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Nov 27 15:06:54 2025 +0530

    KAFKA-19927: Add retry for network and req timeout resp errors. (#20998)
    
    * The client response in `PersisterStateManager` for various RPCs could
    contain errors, some of which are retriable.
    * In this PR, the handlers for individual handlers have been extended to
    actually retry on the errors (NETWORK_EXCEPTION, REQUEST_TIMED_OUT).
    * Tests have been updated and new ones added in
    `PersisterStateManagerTest`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../share/persister/PersisterStateManager.java     | 733 ++++++++++++---------
 .../share/persister/PersisterStateManagerTest.java | 330 +++++++++-
 2 files changed, 752 insertions(+), 311 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 9d2b93f93c8..43562ecc177 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -23,9 +23,7 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.NetworkException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
@@ -80,7 +78,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -387,21 +384,15 @@ public class PersisterStateManager {
             }
 
             if (isFindCoordinatorResponse(response)) {
-                Optional<Errors> err = checkResponseError(response, 
this::findCoordinatorErrorResponse);
-                if (err.isEmpty()) {
-                    handleFindCoordinatorResponse(response);
-                }
+                handleFindCoordinatorResponse(response);
             } else if (isResponseForRequest(response)) {
-                Optional<Errors> err = checkResponseError(response, 
this::requestErrorResponse);
-                if (err.isEmpty()) {
-                    handleRequestResponse(response);
-                }
+                handleRequestResponse(response);
             }
             sender.wakeup();
         }
 
         // Visibility for testing
-        Optional<Errors> checkResponseError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
+        Optional<Errors> checkResponseError(ClientResponse response) {
             if (response.hasResponse()) {
                 return Optional.empty();
             }
@@ -411,22 +402,17 @@ public class PersisterStateManager {
             if (response.authenticationException() != null) {
                 log.error("Authentication exception", 
response.authenticationException());
                 Errors error = 
Errors.forException(response.authenticationException());
-                errorConsumer.accept(error, new 
AuthenticationException(String.format("Server response for %s indicates 
authentication exception.", this.partitionKey)));
                 return Optional.of(error);
             } else if (response.versionMismatch() != null) {
                 log.error("Version mismatch exception", 
response.versionMismatch());
                 Errors error = Errors.forException(response.versionMismatch());
-                errorConsumer.accept(error, new 
UnsupportedVersionException(String.format("Server response for %s indicates 
version mismatch.", this.partitionKey)));
                 return Optional.of(error);
-            } else if (response.wasDisconnected()) {
-                errorConsumer.accept(Errors.NETWORK_EXCEPTION, new 
NetworkException(String.format("Server response for %s indicates disconnect.", 
this.partitionKey)));
+            } else if (response.wasDisconnected()) {    // Retriable
                 return Optional.of(Errors.NETWORK_EXCEPTION);
-            } else if (response.wasTimedOut()) {
-                log.error("Response for RPC {} with key {} timed out - {}.", 
name(), this.partitionKey, response);
-                errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new 
NetworkException(String.format("Server response for %s indicates timeout.", 
this.partitionKey)));
+            } else if (response.wasTimedOut()) {    // Retriable
+                log.debug("Response for RPC {} with key {} timed out - {}.", 
name(), this.partitionKey, response);
                 return Optional.of(Errors.REQUEST_TIMED_OUT);
             } else {
-                errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new 
NetworkException(String.format("Server did not provide any response for %s.", 
this.partitionKey)));
                 return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
             }
         }
@@ -445,41 +431,63 @@ public class PersisterStateManager {
 
             // Incrementing the number of find coordinator attempts
             findCoordBackoff.incrementAttempt();
-            List<FindCoordinatorResponseData.Coordinator> coordinators = 
((FindCoordinatorResponse) response.responseBody()).coordinators();
-            if (coordinators.size() != 1) {
-                log.error("Find coordinator response for {} is invalid", 
partitionKey());
-                findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new 
IllegalStateException("Invalid response with multiple coordinators."));
-                return;
-            }
-
-            FindCoordinatorResponseData.Coordinator coordinatorData = 
coordinators.get(0);
-            Errors error = Errors.forCode(coordinatorData.errorCode());
-            String errorMessage = coordinatorData.errorMessage();
-            if (errorMessage == null || errorMessage.isEmpty()) {
-                errorMessage = error.message();
-            }
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
 
-            switch (error) {
+            switch (clientResponseError) {
                 case NONE:
-                    log.debug("Find coordinator response valid. Enqueuing 
actual request.");
-                    findCoordBackoff.resetAttempts();
-                    coordinatorNode = new Node(coordinatorData.nodeId(), 
coordinatorData.host(), coordinatorData.port());
-                    // now we want the actual share state RPC call to happen
-                    if (this.isBatchable()) {
-                        addRequestToNodeMap(coordinatorNode, this);
-                    } else {
-                        enqueue(this);
+                    List<FindCoordinatorResponseData.Coordinator> coordinators 
= ((FindCoordinatorResponse) response.responseBody()).coordinators();
+                    if (coordinators.size() != 1) {
+                        log.error("Find coordinator response for {} is 
invalid. Number of coordinators = {}", partitionKey(), coordinators.size());
+                        
findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new 
IllegalStateException("Invalid response with multiple coordinators."));
+                        return;
                     }
-                    break;
 
-                case COORDINATOR_NOT_AVAILABLE: // retriable error codes
-                case COORDINATOR_LOAD_IN_PROGRESS:
-                case NOT_COORDINATOR:
-                case UNKNOWN_TOPIC_OR_PARTITION:
-                    log.debug("Received retriable error in find coordinator 
for {} using key {}: {}", name(), partitionKey(), errorMessage);
+                    FindCoordinatorResponseData.Coordinator coordinatorData = 
coordinators.get(0);
+                    Errors error = Errors.forCode(coordinatorData.errorCode());
+                    String errorMessage = coordinatorData.errorMessage();
+                    if (errorMessage == null || errorMessage.isEmpty()) {
+                        errorMessage = error.message();
+                    }
+                    switch (error) {
+                        case NONE:
+                            log.trace("Find coordinator response valid. 
Enqueuing actual request.");
+                            findCoordBackoff.resetAttempts();
+                            coordinatorNode = new 
Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port());
+                            // now we want the actual share state RPC call to 
happen
+                            if (this.isBatchable()) {
+                                addRequestToNodeMap(coordinatorNode, this);
+                            } else {
+                                enqueue(this);
+                            }
+                            break;
+
+                        case COORDINATOR_NOT_AVAILABLE: // retriable error 
codes
+                        case COORDINATOR_LOAD_IN_PROGRESS:
+                        case NOT_COORDINATOR:
+                        case UNKNOWN_TOPIC_OR_PARTITION:
+                            log.debug("Received retriable error in find 
coordinator for {} using key {}: {}", name(), partitionKey(), errorMessage);
+                            if (!findCoordBackoff.canAttempt()) {
+                                log.error("Exhausted max retries to find 
coordinator for {} using key {} without success.", name(), partitionKey());
+                                findCoordinatorErrorResponse(error, new 
Exception("Exhausted max retries to find coordinator without success."));
+                                break;
+                            }
+                            resetCoordinatorNode();
+                            timer.add(new 
PersisterTimerTask(findCoordBackoff.backOff(), this));
+                            break;
+
+                        default:
+                            log.error("Unable to find coordinator for {} using 
key {}: {}.", name(), partitionKey(), errorMessage);
+                            findCoordinatorErrorResponse(error, new 
Exception(errorMessage));
+                    }
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in find coordinator 
client response for {} using key {} due to {}.", name(), partitionKey(), 
clientResponseErrorMessage);
                     if (!findCoordBackoff.canAttempt()) {
-                        log.error("Exhausted max retries to find coordinator 
for {} using key {} without success.", name(), partitionKey());
-                        findCoordinatorErrorResponse(error, new 
Exception("Exhausted max retries to find coordinator without success."));
+                        log.error("Exhausted max retries to find coordinator 
due to error in client response for {} using key {}.", name(), partitionKey());
+                        findCoordinatorErrorResponse(clientResponseError, new 
Exception("Exhausted max retries to find coordinator without success."));
                         break;
                     }
                     resetCoordinatorNode();
@@ -487,8 +495,8 @@ public class PersisterStateManager {
                     break;
 
                 default:
-                    log.error("Unable to find coordinator for {} using key 
{}.", name(), partitionKey());
-                    findCoordinatorErrorResponse(error, new 
Exception(errorMessage));
+                    log.error("Unable to find coordinator due to error in 
client response for {} using key {}: {}", name(), partitionKey(), 
clientResponseError.code());
+                    findCoordinatorErrorResponse(clientResponseError, new 
Exception(clientResponseErrorMessage));
             }
         }
 
@@ -575,64 +583,86 @@ public class PersisterStateManager {
         protected void handleRequestResponse(ClientResponse response) {
             log.debug("Initialize state response received - {}", response);
             initializeStateBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
 
-            // response can be a combined one for large number of requests
-            // we need to deconstruct it
-            InitializeShareGroupStateResponse combinedResponse = 
(InitializeShareGroupStateResponse) response.responseBody();
-
-            for (InitializeShareGroupStateResponseData.InitializeStateResult 
initializeStateResult : combinedResponse.data().results()) {
-                if 
(initializeStateResult.topicId().equals(partitionKey().topicId())) {
-                    
Optional<InitializeShareGroupStateResponseData.PartitionResult> 
partitionStateData =
-                        
initializeStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
-                            .findFirst();
-
-                    if (partitionStateData.isPresent()) {
-                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
-                        String errorMessage = 
partitionStateData.get().errorMessage();
-                        if (errorMessage == null || errorMessage.isEmpty()) {
-                            errorMessage = error.message();
-                        }
+            switch (clientResponseError) {
+                case NONE:
+                    // response can be a combined one for large number of 
requests
+                    // we need to deconstruct it
+                    InitializeShareGroupStateResponse combinedResponse = 
(InitializeShareGroupStateResponse) response.responseBody();
+
+                    for 
(InitializeShareGroupStateResponseData.InitializeStateResult 
initializeStateResult : combinedResponse.data().results()) {
+                        if 
(initializeStateResult.topicId().equals(partitionKey().topicId())) {
+                            
Optional<InitializeShareGroupStateResponseData.PartitionResult> 
partitionStateData =
+                                
initializeStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                            if (partitionStateData.isPresent()) {
+                                Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                                String errorMessage = 
partitionStateData.get().errorMessage();
+                                if (errorMessage == null || 
errorMessage.isEmpty()) {
+                                    errorMessage = error.message();
+                                }
 
-                        switch (error) {
-                            case NONE:
-                                initializeStateBackoff.resetAttempts();
-                                
InitializeShareGroupStateResponseData.InitializeStateResult result = 
InitializeShareGroupStateResponse.toResponseInitializeStateResult(
-                                    partitionKey().topicId(),
-                                    List.of(partitionStateData.get())
-                                );
-                                this.result.complete(new 
InitializeShareGroupStateResponse(
-                                    new 
InitializeShareGroupStateResponseData().setResults(List.of(result))));
-                                return;
-
-                            // check retriable errors
-                            case COORDINATOR_NOT_AVAILABLE:
-                            case COORDINATOR_LOAD_IN_PROGRESS:
-                            case NOT_COORDINATOR:
-                            case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
-                                if (!initializeStateBackoff.canAttempt()) {
-                                    log.error("Exhausted max retries for 
initialize state RPC for key {} without success.", partitionKey());
-                                    requestErrorResponse(error, new 
Exception("Exhausted max retries to complete initialize state RPC without 
success."));
-                                    return;
+                                switch (error) {
+                                    case NONE:
+                                        initializeStateBackoff.resetAttempts();
+                                        
InitializeShareGroupStateResponseData.InitializeStateResult result = 
InitializeShareGroupStateResponse.toResponseInitializeStateResult(
+                                            partitionKey().topicId(),
+                                            List.of(partitionStateData.get())
+                                        );
+                                        this.result.complete(new 
InitializeShareGroupStateResponse(
+                                            new 
InitializeShareGroupStateResponseData().setResults(List.of(result))));
+                                        return;
+
+                                    // check retriable errors
+                                    case COORDINATOR_NOT_AVAILABLE:
+                                    case COORDINATOR_LOAD_IN_PROGRESS:
+                                    case NOT_COORDINATOR:
+                                    case UNKNOWN_TOPIC_OR_PARTITION:
+                                        log.debug("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        if 
(!initializeStateBackoff.canAttempt()) {
+                                            log.error("Exhausted max retries 
for initialize state RPC for key {} without success.", partitionKey());
+                                            requestErrorResponse(error, new 
Exception("Exhausted max retries to complete initialize state RPC without 
success."));
+                                            return;
+                                        }
+                                        super.resetCoordinatorNode();
+                                        timer.add(new 
PersisterTimerTask(initializeStateBackoff.backOff(), this));
+                                        return;
+
+                                    default:
+                                        log.error("Unable to perform 
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        requestErrorResponse(error, new 
Exception(errorMessage));
+                                        return;
                                 }
-                                super.resetCoordinatorNode();
-                                timer.add(new 
PersisterTimerTask(initializeStateBackoff.backOff(), this));
-                                return;
-
-                            default:
-                                log.error("Unable to perform initialize state 
RPC for key {}: {}", partitionKey(), errorMessage);
-                                requestErrorResponse(error, new 
Exception(errorMessage));
-                                return;
+                            }
                         }
                     }
-                }
-            }
 
-            // no response found specific topic partition
-            IllegalStateException exception = new IllegalStateException(
-                "Failed to initialize state for share partition: " + 
partitionKey()
-            );
-            requestErrorResponse(Errors.forException(exception), exception);
+                    // no response found specific topic partition
+                    IllegalStateException exception = new 
IllegalStateException(
+                        "Failed to initialize state for share partition: " + 
partitionKey()
+                    );
+                    requestErrorResponse(Errors.forException(exception), 
exception);
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in initialize state 
RPC client response for key {}: {}", partitionKey(), 
clientResponseErrorMessage);
+                    if (!initializeStateBackoff.canAttempt()) {
+                        log.error("Exhausted max retries for initialize state 
RPC due to error in client response for key {}.", partitionKey());
+                        requestErrorResponse(clientResponseError, new 
Exception("Exhausted max retries to complete initialize state RPC without 
success."));
+                        return;
+                    }
+                    super.resetCoordinatorNode();
+                    timer.add(new 
PersisterTimerTask(initializeStateBackoff.backOff(), this));
+                    return;
+
+                default:
+                    log.error("Unable to perform initialize state RPC due to 
error in client response for key {}: {}", partitionKey(), 
clientResponseError.code());
+                    requestErrorResponse(clientResponseError, new 
Exception(clientResponseErrorMessage));
+            }
         }
 
         @Override
@@ -744,64 +774,85 @@ public class PersisterStateManager {
         protected void handleRequestResponse(ClientResponse response) {
             log.debug("Write state response received - {}", response);
             writeStateBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
+            switch (clientResponseError) {
+                case NONE:
+                    // response can be a combined one for large number of 
requests
+                    // we need to deconstruct it
+                    WriteShareGroupStateResponse combinedResponse = 
(WriteShareGroupStateResponse) response.responseBody();
+
+                    for (WriteShareGroupStateResponseData.WriteStateResult 
writeStateResult : combinedResponse.data().results()) {
+                        if 
(writeStateResult.topicId().equals(partitionKey().topicId())) {
+                            
Optional<WriteShareGroupStateResponseData.PartitionResult> partitionStateData =
+                                
writeStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                            if (partitionStateData.isPresent()) {
+                                Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                                String errorMessage = 
partitionStateData.get().errorMessage();
+                                if (errorMessage == null || 
errorMessage.isEmpty()) {
+                                    errorMessage = error.message();
+                                }
 
-            // response can be a combined one for large number of requests
-            // we need to deconstruct it
-            WriteShareGroupStateResponse combinedResponse = 
(WriteShareGroupStateResponse) response.responseBody();
-
-            for (WriteShareGroupStateResponseData.WriteStateResult 
writeStateResult : combinedResponse.data().results()) {
-                if 
(writeStateResult.topicId().equals(partitionKey().topicId())) {
-                    Optional<WriteShareGroupStateResponseData.PartitionResult> 
partitionStateData =
-                        
writeStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
-                            .findFirst();
-
-                    if (partitionStateData.isPresent()) {
-                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
-                        String errorMessage = 
partitionStateData.get().errorMessage();
-                        if (errorMessage == null || errorMessage.isEmpty()) {
-                            errorMessage = error.message();
-                        }
-
-                        switch (error) {
-                            case NONE:
-                                writeStateBackoff.resetAttempts();
-                                
WriteShareGroupStateResponseData.WriteStateResult result = 
WriteShareGroupStateResponse.toResponseWriteStateResult(
-                                    partitionKey().topicId(),
-                                    List.of(partitionStateData.get())
-                                );
-                                this.result.complete(new 
WriteShareGroupStateResponse(
-                                    new 
WriteShareGroupStateResponseData().setResults(List.of(result))));
-                                return;
-
-                            // check retriable errors
-                            case COORDINATOR_NOT_AVAILABLE:
-                            case COORDINATOR_LOAD_IN_PROGRESS:
-                            case NOT_COORDINATOR:
-                            case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in write 
state RPC for key {}: {}", partitionKey(), errorMessage);
-                                if (!writeStateBackoff.canAttempt()) {
-                                    log.error("Exhausted max retries for write 
state RPC for key {} without success.", partitionKey());
-                                    requestErrorResponse(error, new 
Exception("Exhausted max retries to complete write state RPC without 
success."));
-                                    return;
+                                switch (error) {
+                                    case NONE:
+                                        writeStateBackoff.resetAttempts();
+                                        
WriteShareGroupStateResponseData.WriteStateResult result = 
WriteShareGroupStateResponse.toResponseWriteStateResult(
+                                            partitionKey().topicId(),
+                                            List.of(partitionStateData.get())
+                                        );
+                                        this.result.complete(new 
WriteShareGroupStateResponse(
+                                            new 
WriteShareGroupStateResponseData().setResults(List.of(result))));
+                                        return;
+
+                                    // check retriable errors
+                                    case COORDINATOR_NOT_AVAILABLE:
+                                    case COORDINATOR_LOAD_IN_PROGRESS:
+                                    case NOT_COORDINATOR:
+                                    case UNKNOWN_TOPIC_OR_PARTITION:
+                                        log.debug("Received retriable error in 
write state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        if (!writeStateBackoff.canAttempt()) {
+                                            log.error("Exhausted max retries 
for write state RPC for key {} without success.", partitionKey());
+                                            requestErrorResponse(error, new 
Exception("Exhausted max retries to complete write state RPC without 
success."));
+                                            return;
+                                        }
+                                        super.resetCoordinatorNode();
+                                        timer.add(new 
PersisterTimerTask(writeStateBackoff.backOff(), this));
+                                        return;
+
+                                    default:
+                                        log.error("Unable to perform write 
state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        requestErrorResponse(error, new 
Exception(errorMessage));
+                                        return;
                                 }
-                                super.resetCoordinatorNode();
-                                timer.add(new 
PersisterTimerTask(writeStateBackoff.backOff(), this));
-                                return;
-
-                            default:
-                                log.error("Unable to perform write state RPC 
for key {}: {}", partitionKey(), errorMessage);
-                                requestErrorResponse(error, new 
Exception(errorMessage));
-                                return;
+                            }
                         }
                     }
-                }
-            }
 
-            // no response found specific topic partition
-            IllegalStateException exception = new IllegalStateException(
-                "Failed to write state for share partition: " + partitionKey()
-            );
-            requestErrorResponse(Errors.forException(exception), exception);
+                    // no response found specific topic partition
+                    IllegalStateException exception = new 
IllegalStateException(
+                        "Failed to write state for share partition: " + 
partitionKey()
+                    );
+                    requestErrorResponse(Errors.forException(exception), 
exception);
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in write state RPC 
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+                    if (!writeStateBackoff.canAttempt()) {
+                        log.error("Exhausted max retries for write state RPC 
due to error in client response for key {}.", partitionKey());
+                        requestErrorResponse(clientResponseError, new 
Exception("Exhausted max retries to complete write state RPC without 
success."));
+                        return;
+                    }
+                    super.resetCoordinatorNode();
+                    timer.add(new 
PersisterTimerTask(writeStateBackoff.backOff(), this));
+                    return;
+
+                default:
+                    log.error("Unable to perform write state RPC due to error 
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+                    requestErrorResponse(clientResponseError, new 
Exception(clientResponseErrorMessage));
+            }
         }
 
         @Override
@@ -895,61 +946,83 @@ public class PersisterStateManager {
         protected void handleRequestResponse(ClientResponse response) {
             log.debug("Read state response received - {}", response);
             readStateBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
 
-            ReadShareGroupStateResponse combinedResponse = 
(ReadShareGroupStateResponse) response.responseBody();
-            for (ReadShareGroupStateResponseData.ReadStateResult 
readStateResult : combinedResponse.data().results()) {
-                if 
(readStateResult.topicId().equals(partitionKey().topicId())) {
-                    Optional<ReadShareGroupStateResponseData.PartitionResult> 
partitionStateData =
-                        
readStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
-                            .findFirst();
-
-                    if (partitionStateData.isPresent()) {
-                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
-                        String errorMessage = 
partitionStateData.get().errorMessage();
-                        if (errorMessage == null || errorMessage.isEmpty()) {
-                            errorMessage = error.message();
-                        }
+            switch (clientResponseError) {
+                case NONE:
+                    ReadShareGroupStateResponse combinedResponse = 
(ReadShareGroupStateResponse) response.responseBody();
+                    for (ReadShareGroupStateResponseData.ReadStateResult 
readStateResult : combinedResponse.data().results()) {
+                        if 
(readStateResult.topicId().equals(partitionKey().topicId())) {
+                            
Optional<ReadShareGroupStateResponseData.PartitionResult> partitionStateData =
+                                
readStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                            if (partitionStateData.isPresent()) {
+                                Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                                String errorMessage = 
partitionStateData.get().errorMessage();
+                                if (errorMessage == null || 
errorMessage.isEmpty()) {
+                                    errorMessage = error.message();
+                                }
 
-                        switch (error) {
-                            case NONE:
-                                readStateBackoff.resetAttempts();
-                                
ReadShareGroupStateResponseData.ReadStateResult result = 
ReadShareGroupStateResponse.toResponseReadStateResult(
-                                    partitionKey().topicId(),
-                                    List.of(partitionStateData.get())
-                                );
-                                this.result.complete(new 
ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
-                                    .setResults(List.of(result))));
-                                return;
-
-                            // check retriable errors
-                            case COORDINATOR_NOT_AVAILABLE:
-                            case COORDINATOR_LOAD_IN_PROGRESS:
-                            case NOT_COORDINATOR:
-                            case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), errorMessage);
-                                if (!readStateBackoff.canAttempt()) {
-                                    log.error("Exhausted max retries for read 
state RPC for key {} without success.", partitionKey());
-                                    requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state RPC without success."));
-                                    return;
+                                switch (error) {
+                                    case NONE:
+                                        readStateBackoff.resetAttempts();
+                                        
ReadShareGroupStateResponseData.ReadStateResult result = 
ReadShareGroupStateResponse.toResponseReadStateResult(
+                                            partitionKey().topicId(),
+                                            List.of(partitionStateData.get())
+                                        );
+                                        this.result.complete(new 
ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
+                                            .setResults(List.of(result))));
+                                        return;
+
+                                    // check retriable errors
+                                    case COORDINATOR_NOT_AVAILABLE:
+                                    case COORDINATOR_LOAD_IN_PROGRESS:
+                                    case NOT_COORDINATOR:
+                                    case UNKNOWN_TOPIC_OR_PARTITION:
+                                        log.debug("Received retriable error in 
read state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        if (!readStateBackoff.canAttempt()) {
+                                            log.error("Exhausted max retries 
for read state RPC for key {} without success.", partitionKey());
+                                            requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state RPC without success."));
+                                            return;
+                                        }
+                                        super.resetCoordinatorNode();
+                                        timer.add(new 
PersisterTimerTask(readStateBackoff.backOff(), this));
+                                        return;
+
+                                    default:
+                                        log.error("Unable to perform read 
state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        requestErrorResponse(error, new 
Exception(errorMessage));
+                                        return;
                                 }
-                                super.resetCoordinatorNode();
-                                timer.add(new 
PersisterTimerTask(readStateBackoff.backOff(), this));
-                                return;
-
-                            default:
-                                log.error("Unable to perform read state RPC 
for key {}: {}", partitionKey(), errorMessage);
-                                requestErrorResponse(error, new 
Exception(errorMessage));
-                                return;
+                            }
                         }
                     }
-                }
-            }
 
-            // no response found specific topic partition
-            IllegalStateException exception = new IllegalStateException(
-                "Failed to read state for share partition " + partitionKey()
-            );
-            requestErrorResponse(Errors.forException(exception), exception);
+                    // no response found specific topic partition
+                    IllegalStateException exception = new 
IllegalStateException(
+                        "Failed to read state for share partition " + 
partitionKey()
+                    );
+                    requestErrorResponse(Errors.forException(exception), 
exception);
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in read state RPC 
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+                    if (!readStateBackoff.canAttempt()) {
+                        log.error("Exhausted max retries for read state RPC 
due to error in client response for key {}.", partitionKey());
+                        requestErrorResponse(clientResponseError, new 
Exception("Exhausted max retries to complete read state RPC without success."));
+                        return;
+                    }
+                    super.resetCoordinatorNode();
+                    timer.add(new 
PersisterTimerTask(readStateBackoff.backOff(), this));
+                    return;
+
+                default:
+                    log.error("Unable to perform read state RPC due to error 
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+                    requestErrorResponse(clientResponseError, new 
Exception(clientResponseErrorMessage));
+            }
         }
 
         @Override
@@ -1043,61 +1116,83 @@ public class PersisterStateManager {
         protected void handleRequestResponse(ClientResponse response) {
             log.debug("Read state summary response received - {}", response);
             readStateSummaryBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
 
-            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
-            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateSummaryResult : combinedResponse.data().results()) {
-                if 
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
-                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
-                        
readStateSummaryResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
-                            .findFirst();
-
-                    if (partitionStateData.isPresent()) {
-                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
-                        String errorMessage = 
partitionStateData.get().errorMessage();
-                        if (errorMessage == null || errorMessage.isEmpty()) {
-                            errorMessage = error.message();
-                        }
+            switch (clientResponseError) {
+                case NONE:
+                    ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+                    for 
(ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateSummaryResult : combinedResponse.data().results()) {
+                        if 
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
+                            
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                                
readStateSummaryResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                            if (partitionStateData.isPresent()) {
+                                Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                                String errorMessage = 
partitionStateData.get().errorMessage();
+                                if (errorMessage == null || 
errorMessage.isEmpty()) {
+                                    errorMessage = error.message();
+                                }
 
-                        switch (error) {
-                            case NONE:
-                                readStateSummaryBackoff.resetAttempts();
-                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
-                                    partitionKey().topicId(),
-                                    List.of(partitionStateData.get())
-                                );
-                                this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
-                                    .setResults(List.of(result))));
-                                return;
-
-                            // check retriable errors
-                            case COORDINATOR_NOT_AVAILABLE:
-                            case COORDINATOR_LOAD_IN_PROGRESS:
-                            case NOT_COORDINATOR:
-                            case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), errorMessage);
-                                if (!readStateSummaryBackoff.canAttempt()) {
-                                    log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
-                                    requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
-                                    return;
+                                switch (error) {
+                                    case NONE:
+                                        
readStateSummaryBackoff.resetAttempts();
+                                        
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                            partitionKey().topicId(),
+                                            List.of(partitionStateData.get())
+                                        );
+                                        this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+                                            .setResults(List.of(result))));
+                                        return;
+
+                                    // check retriable errors
+                                    case COORDINATOR_NOT_AVAILABLE:
+                                    case COORDINATOR_LOAD_IN_PROGRESS:
+                                    case NOT_COORDINATOR:
+                                    case UNKNOWN_TOPIC_OR_PARTITION:
+                                        log.debug("Received retriable error in 
read state summary RPC for key {}: {}", partitionKey(), errorMessage);
+                                        if 
(!readStateSummaryBackoff.canAttempt()) {
+                                            log.error("Exhausted max retries 
for read state summary RPC for key {} without success.", partitionKey());
+                                            requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
+                                            return;
+                                        }
+                                        super.resetCoordinatorNode();
+                                        timer.add(new 
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+                                        return;
+
+                                    default:
+                                        log.error("Unable to perform read 
state summary RPC for key {}: {}", partitionKey(), errorMessage);
+                                        requestErrorResponse(error, new 
Exception(errorMessage));
+                                        return;
                                 }
-                                super.resetCoordinatorNode();
-                                timer.add(new 
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
-                                return;
-
-                            default:
-                                log.error("Unable to perform read state 
summary RPC for key {}: {}", partitionKey(), errorMessage);
-                                requestErrorResponse(error, new 
Exception(errorMessage));
-                                return;
+                            }
                         }
                     }
-                }
-            }
 
-            // no response found specific topic partition
-            IllegalStateException exception = new IllegalStateException(
-                "Failed to read state summary for share partition " + 
partitionKey()
-            );
-            requestErrorResponse(Errors.forException(exception), exception);
+                    // no response found specific topic partition
+                    IllegalStateException exception = new 
IllegalStateException(
+                        "Failed to read state summary for share partition " + 
partitionKey()
+                    );
+                    requestErrorResponse(Errors.forException(exception), 
exception);
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in read state summary 
RPC client response for key {}: {}", partitionKey(), 
clientResponseErrorMessage);
+                    if (!readStateSummaryBackoff.canAttempt()) {
+                        log.error("Exhausted max retries for read state 
summary RPC due to error in client response for key {}.", partitionKey());
+                        requestErrorResponse(clientResponseError, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
+                        return;
+                    }
+                    super.resetCoordinatorNode();
+                    timer.add(new 
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+                    return;
+
+                default:
+                    log.error("Unable to perform read state summary RPC due to 
error in client response for key {}: {}", partitionKey(), 
clientResponseError.code());
+                    requestErrorResponse(clientResponseError, new 
Exception(clientResponseErrorMessage));
+            }
         }
 
         @Override
@@ -1184,65 +1279,87 @@ public class PersisterStateManager {
         protected void handleRequestResponse(ClientResponse response) {
             log.debug("Delete state response received - {}", response);
             deleteStateBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
 
-            // response can be a combined one for large number of requests
-            // we need to deconstruct it
-            DeleteShareGroupStateResponse combinedResponse = 
(DeleteShareGroupStateResponse) response.responseBody();
-
-            for (DeleteShareGroupStateResponseData.DeleteStateResult 
deleteStateResult : combinedResponse.data().results()) {
-                if 
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
-                    
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
-                        deleteStateResult.partitions().stream()
-                            .filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
-                            .findFirst();
-
-                    if (partitionStateData.isPresent()) {
-                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
-                        String errorMessage = 
partitionStateData.get().errorMessage();
-                        if (errorMessage == null || errorMessage.isEmpty()) {
-                            errorMessage = error.message();
-                        }
+            switch (clientResponseError) {
+                case NONE:
+                    // response can be a combined one for large number of 
requests
+                    // we need to deconstruct it
+                    DeleteShareGroupStateResponse combinedResponse = 
(DeleteShareGroupStateResponse) response.responseBody();
+
+                    for (DeleteShareGroupStateResponseData.DeleteStateResult 
deleteStateResult : combinedResponse.data().results()) {
+                        if 
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
+                            
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
+                                deleteStateResult.partitions().stream()
+                                    .filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                            if (partitionStateData.isPresent()) {
+                                Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                                String errorMessage = 
partitionStateData.get().errorMessage();
+                                if (errorMessage == null || 
errorMessage.isEmpty()) {
+                                    errorMessage = error.message();
+                                }
 
-                        switch (error) {
-                            case NONE:
-                                deleteStateBackoff.resetAttempts();
-                                
DeleteShareGroupStateResponseData.DeleteStateResult result = 
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
-                                    partitionKey().topicId(),
-                                    List.of(partitionStateData.get())
-                                );
-                                this.result.complete(new 
DeleteShareGroupStateResponse(
-                                    new 
DeleteShareGroupStateResponseData().setResults(List.of(result))));
-                                return;
-
-                            // check retriable errors
-                            case COORDINATOR_NOT_AVAILABLE:
-                            case COORDINATOR_LOAD_IN_PROGRESS:
-                            case NOT_COORDINATOR:
-                            case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), errorMessage);
-                                if (!deleteStateBackoff.canAttempt()) {
-                                    log.error("Exhausted max retries for 
delete state RPC for key {} without success.", partitionKey());
-                                    requestErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
-                                    return;
+                                switch (error) {
+                                    case NONE:
+                                        deleteStateBackoff.resetAttempts();
+                                        
DeleteShareGroupStateResponseData.DeleteStateResult result = 
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
+                                            partitionKey().topicId(),
+                                            List.of(partitionStateData.get())
+                                        );
+                                        this.result.complete(new 
DeleteShareGroupStateResponse(
+                                            new 
DeleteShareGroupStateResponseData().setResults(List.of(result))));
+                                        return;
+
+                                    // check retriable errors
+                                    case COORDINATOR_NOT_AVAILABLE:
+                                    case COORDINATOR_LOAD_IN_PROGRESS:
+                                    case NOT_COORDINATOR:
+                                    case UNKNOWN_TOPIC_OR_PARTITION:
+                                        log.debug("Received retriable error in 
delete state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        if (!deleteStateBackoff.canAttempt()) {
+                                            log.error("Exhausted max retries 
for delete state RPC for key {} without success.", partitionKey());
+                                            requestErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
+                                            return;
+                                        }
+                                        super.resetCoordinatorNode();
+                                        timer.add(new 
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+                                        return;
+
+                                    default:
+                                        log.error("Unable to perform delete 
state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        requestErrorResponse(error, new 
Exception(errorMessage));
+                                        return;
                                 }
-                                super.resetCoordinatorNode();
-                                timer.add(new 
PersisterTimerTask(deleteStateBackoff.backOff(), this));
-                                return;
-
-                            default:
-                                log.error("Unable to perform delete state RPC 
for key {}: {}", partitionKey(), errorMessage);
-                                requestErrorResponse(error, new 
Exception(errorMessage));
-                                return;
+                            }
                         }
                     }
-                }
-            }
 
-            // no response found specific topic partition
-            IllegalStateException exception = new IllegalStateException(
-                "Failed to delete state for share partition: " + partitionKey()
-            );
-            requestErrorResponse(Errors.forException(exception), exception);
+                    // no response found specific topic partition
+                    IllegalStateException exception = new 
IllegalStateException(
+                        "Failed to delete state for share partition: " + 
partitionKey()
+                    );
+                    requestErrorResponse(Errors.forException(exception), 
exception);
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in delete state RPC 
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+                    if (!deleteStateBackoff.canAttempt()) {
+                        log.error("Exhausted max retries for delete state RPC 
due to error in client response for key {}.", partitionKey());
+                        requestErrorResponse(clientResponseError, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
+                        return;
+                    }
+                    super.resetCoordinatorNode();
+                    timer.add(new 
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+                    return;
+
+                default:
+                    log.error("Unable to perform delete state RPC due to error 
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+                    requestErrorResponse(clientResponseError, new 
Exception(clientResponseErrorMessage));
+            }
         }
 
         @Override
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index bdce1713817..bd4f87ae19d 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -466,7 +466,7 @@ class PersisterStateManagerTest {
         }
 
         assertEquals(Errors.NETWORK_EXCEPTION.code(), 
result.data().results().get(0).partitions().get(0).errorCode());
-        verify(handler, times(1)).findShareCoordinatorBuilder();
+        verify(handler, times(5)).findShareCoordinatorBuilder();    // 
Retriable exception
 
         try {
             // Stopping the state manager
@@ -1373,6 +1373,77 @@ class PersisterStateManagerTest {
         }
     }
 
+    @Test
+    public void testWriteStateClientResponseErrorRetriesExhausted() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+        List<PersisterStateBatch> stateBatches = List.of(
+            new PersisterStateBatch(0, 9, (byte) 0, (short) 1),
+            new PersisterStateBatch(10, 19, (byte) 1, (short) 1)
+        );
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+        client.setUnreachable(coordinatorNode, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<WriteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.WriteStateHandler handler = spy(stateManager.new 
WriteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            0,
+            0,
+            0,
+            0,
+            stateBatches,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            2
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<WriteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        WriteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        WriteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), 
partitionResult.errorCode());
+        verify(handler, times(2)).handleRequestResponse(any());    // 
Retriable exception
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
     @Test
     public void testWriteStateRequestBatchingWithCoordinatorNodeLookup() 
throws ExecutionException, Exception {
         MockClient client = new MockClient(MOCK_TIME);
@@ -2234,6 +2305,70 @@ class PersisterStateManagerTest {
         }
     }
 
+    @Test
+    public void testReadStateClientResponseErrorRetriesExhausted() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+        client.setUnreachable(coordinatorNode, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<ReadShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.ReadStateHandler handler = spy(stateManager.new 
ReadStateHandler(
+            groupId,
+            topicId,
+            partition,
+            0,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            2,
+            null
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<ReadShareGroupStateResponse> resultFuture = 
handler.result();
+
+        ReadShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        ReadShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), 
partitionResult.errorCode());
+        verify(handler, times(2)).handleRequestResponse(any());    // 
Retriable exception
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
     @Test
     public void testReadStateSummaryRequestCoordinatorFoundSuccessfully() {
         MockClient client = new MockClient(MOCK_TIME);
@@ -3004,6 +3139,70 @@ class PersisterStateManagerTest {
         }
     }
 
+    @Test
+    public void testReadStateSummaryClientResponseErrorRetriesExhausted() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+        client.setUnreachable(coordinatorNode, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<ReadShareGroupStateSummaryResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.ReadStateSummaryHandler handler = 
spy(stateManager.new ReadStateSummaryHandler(
+            groupId,
+            topicId,
+            partition,
+            0,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            2,
+            null
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<ReadShareGroupStateSummaryResponse> resultFuture = 
handler.result();
+
+        ReadShareGroupStateSummaryResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult 
= result.data().results().get(0).partitions().get(0);
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), 
partitionResult.errorCode());
+        verify(handler, times(2)).handleRequestResponse(any());    // 
Retriable exception
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
     @Test
     public void testDeleteStateRequestCoordinatorFoundSuccessfully() {
         MockClient client = new MockClient(MOCK_TIME);
@@ -3726,6 +3925,68 @@ class PersisterStateManagerTest {
         TestUtils.waitForCondition(isBatchingSuccess::get, 
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
     }
 
+    @Test
+    public void testDeleteStateClientResponseErrorRetriesExhausted() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+        client.setUnreachable(coordinatorNode, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            2
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        DeleteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), 
partitionResult.errorCode());
+        verify(handler, times(2)).handleRequestResponse(any());    // 
Retriable exception
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
     @Test
     public void testInitializeStateRequestCoordinatorFoundSuccessfully() {
         MockClient client = new MockClient(MOCK_TIME);
@@ -4476,6 +4737,70 @@ class PersisterStateManagerTest {
         TestUtils.waitForCondition(isBatchingSuccess::get, 
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
     }
 
+    @Test
+    public void testInitializeStateClientResponseErrorRetriesExhausted() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+        client.setUnreachable(coordinatorNode, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<InitializeShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.InitializeStateHandler handler = 
spy(stateManager.new InitializeStateHandler(
+            groupId,
+            topicId,
+            partition,
+            0,
+            0L,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            2
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<InitializeShareGroupStateResponse> resultFuture = 
handler.result();
+
+        InitializeShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        InitializeShareGroupStateResponseData.PartitionResult partitionResult 
= result.data().results().get(0).partitions().get(0);
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), 
partitionResult.errorCode());
+        verify(handler, times(2)).handleRequestResponse(any());    // 
Retriable exception
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
     @Test
     public void testPersisterStateManagerClose() {
         KafkaClient client = mock(KafkaClient.class);
@@ -4600,7 +4925,6 @@ class PersisterStateManagerTest {
         when(response.wasTimedOut()).thenReturn(holder.wasTimedOut);
         
when(response.authenticationException()).thenReturn(holder.authException ? new 
SaslAuthenticationException("bad stuff") : null);
         when(response.versionMismatch()).thenReturn(holder.versionMismatch ? 
new UnsupportedVersionException("worse stuff") : null);
-        assertEquals(holder.exp, handler.checkResponseError(response, (err, 
exp) -> {
-        }), holder.toString());
+        assertEquals(holder.exp, handler.checkResponseError(response), 
holder.toString());
     }
 }

Reply via email to