This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new b514023064e KAFKA-19927: Add retry for network and req timeout resp
errors. (#20998)
b514023064e is described below
commit b514023064e076a9bf300f9400d5c18e167ddb31
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Nov 27 15:06:54 2025 +0530
KAFKA-19927: Add retry for network and req timeout resp errors. (#20998)
* The client response in `PersisterStateManager` for various RPCs could
contain errors, some of which are retriable.
* In this PR, the handlers for individual handlers have been extended to
actually retry on the errors (NETWORK_EXCEPTION, REQUEST_TIMED_OUT).
* Tests have been updated and new ones added in
`PersisterStateManagerTest`.
Reviewers: Andrew Schofield <[email protected]>
---
.../share/persister/PersisterStateManager.java | 733 ++++++++++++---------
.../share/persister/PersisterStateManagerTest.java | 330 +++++++++-
2 files changed, 752 insertions(+), 311 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 9d2b93f93c8..43562ecc177 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -23,9 +23,7 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.NetworkException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
@@ -80,7 +78,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -387,21 +384,15 @@ public class PersisterStateManager {
}
if (isFindCoordinatorResponse(response)) {
- Optional<Errors> err = checkResponseError(response,
this::findCoordinatorErrorResponse);
- if (err.isEmpty()) {
- handleFindCoordinatorResponse(response);
- }
+ handleFindCoordinatorResponse(response);
} else if (isResponseForRequest(response)) {
- Optional<Errors> err = checkResponseError(response,
this::requestErrorResponse);
- if (err.isEmpty()) {
- handleRequestResponse(response);
- }
+ handleRequestResponse(response);
}
sender.wakeup();
}
// Visibility for testing
- Optional<Errors> checkResponseError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
+ Optional<Errors> checkResponseError(ClientResponse response) {
if (response.hasResponse()) {
return Optional.empty();
}
@@ -411,22 +402,17 @@ public class PersisterStateManager {
if (response.authenticationException() != null) {
log.error("Authentication exception",
response.authenticationException());
Errors error =
Errors.forException(response.authenticationException());
- errorConsumer.accept(error, new
AuthenticationException(String.format("Server response for %s indicates
authentication exception.", this.partitionKey)));
return Optional.of(error);
} else if (response.versionMismatch() != null) {
log.error("Version mismatch exception",
response.versionMismatch());
Errors error = Errors.forException(response.versionMismatch());
- errorConsumer.accept(error, new
UnsupportedVersionException(String.format("Server response for %s indicates
version mismatch.", this.partitionKey)));
return Optional.of(error);
- } else if (response.wasDisconnected()) {
- errorConsumer.accept(Errors.NETWORK_EXCEPTION, new
NetworkException(String.format("Server response for %s indicates disconnect.",
this.partitionKey)));
+ } else if (response.wasDisconnected()) { // Retriable
return Optional.of(Errors.NETWORK_EXCEPTION);
- } else if (response.wasTimedOut()) {
- log.error("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
- errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new
NetworkException(String.format("Server response for %s indicates timeout.",
this.partitionKey)));
+ } else if (response.wasTimedOut()) { // Retriable
+ log.debug("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
return Optional.of(Errors.REQUEST_TIMED_OUT);
} else {
- errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new
NetworkException(String.format("Server did not provide any response for %s.",
this.partitionKey)));
return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
}
}
@@ -445,41 +431,63 @@ public class PersisterStateManager {
// Incrementing the number of find coordinator attempts
findCoordBackoff.incrementAttempt();
- List<FindCoordinatorResponseData.Coordinator> coordinators =
((FindCoordinatorResponse) response.responseBody()).coordinators();
- if (coordinators.size() != 1) {
- log.error("Find coordinator response for {} is invalid",
partitionKey());
- findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new
IllegalStateException("Invalid response with multiple coordinators."));
- return;
- }
-
- FindCoordinatorResponseData.Coordinator coordinatorData =
coordinators.get(0);
- Errors error = Errors.forCode(coordinatorData.errorCode());
- String errorMessage = coordinatorData.errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
- switch (error) {
+ switch (clientResponseError) {
case NONE:
- log.debug("Find coordinator response valid. Enqueuing
actual request.");
- findCoordBackoff.resetAttempts();
- coordinatorNode = new Node(coordinatorData.nodeId(),
coordinatorData.host(), coordinatorData.port());
- // now we want the actual share state RPC call to happen
- if (this.isBatchable()) {
- addRequestToNodeMap(coordinatorNode, this);
- } else {
- enqueue(this);
+ List<FindCoordinatorResponseData.Coordinator> coordinators
= ((FindCoordinatorResponse) response.responseBody()).coordinators();
+ if (coordinators.size() != 1) {
+ log.error("Find coordinator response for {} is
invalid. Number of coordinators = {}", partitionKey(), coordinators.size());
+
findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new
IllegalStateException("Invalid response with multiple coordinators."));
+ return;
}
- break;
- case COORDINATOR_NOT_AVAILABLE: // retriable error codes
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in find coordinator
for {} using key {}: {}", name(), partitionKey(), errorMessage);
+ FindCoordinatorResponseData.Coordinator coordinatorData =
coordinators.get(0);
+ Errors error = Errors.forCode(coordinatorData.errorCode());
+ String errorMessage = coordinatorData.errorMessage();
+ if (errorMessage == null || errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
+ switch (error) {
+ case NONE:
+ log.trace("Find coordinator response valid.
Enqueuing actual request.");
+ findCoordBackoff.resetAttempts();
+ coordinatorNode = new
Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port());
+ // now we want the actual share state RPC call to
happen
+ if (this.isBatchable()) {
+ addRequestToNodeMap(coordinatorNode, this);
+ } else {
+ enqueue(this);
+ }
+ break;
+
+ case COORDINATOR_NOT_AVAILABLE: // retriable error
codes
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in find
coordinator for {} using key {}: {}", name(), partitionKey(), errorMessage);
+ if (!findCoordBackoff.canAttempt()) {
+ log.error("Exhausted max retries to find
coordinator for {} using key {} without success.", name(), partitionKey());
+ findCoordinatorErrorResponse(error, new
Exception("Exhausted max retries to find coordinator without success."));
+ break;
+ }
+ resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(findCoordBackoff.backOff(), this));
+ break;
+
+ default:
+ log.error("Unable to find coordinator for {} using
key {}: {}.", name(), partitionKey(), errorMessage);
+ findCoordinatorErrorResponse(error, new
Exception(errorMessage));
+ }
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in find coordinator
client response for {} using key {} due to {}.", name(), partitionKey(),
clientResponseErrorMessage);
if (!findCoordBackoff.canAttempt()) {
- log.error("Exhausted max retries to find coordinator
for {} using key {} without success.", name(), partitionKey());
- findCoordinatorErrorResponse(error, new
Exception("Exhausted max retries to find coordinator without success."));
+ log.error("Exhausted max retries to find coordinator
due to error in client response for {} using key {}.", name(), partitionKey());
+ findCoordinatorErrorResponse(clientResponseError, new
Exception("Exhausted max retries to find coordinator without success."));
break;
}
resetCoordinatorNode();
@@ -487,8 +495,8 @@ public class PersisterStateManager {
break;
default:
- log.error("Unable to find coordinator for {} using key
{}.", name(), partitionKey());
- findCoordinatorErrorResponse(error, new
Exception(errorMessage));
+ log.error("Unable to find coordinator due to error in
client response for {} using key {}: {}", name(), partitionKey(),
clientResponseError.code());
+ findCoordinatorErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
@@ -575,64 +583,86 @@ public class PersisterStateManager {
protected void handleRequestResponse(ClientResponse response) {
log.debug("Initialize state response received - {}", response);
initializeStateBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
- // response can be a combined one for large number of requests
- // we need to deconstruct it
- InitializeShareGroupStateResponse combinedResponse =
(InitializeShareGroupStateResponse) response.responseBody();
-
- for (InitializeShareGroupStateResponseData.InitializeStateResult
initializeStateResult : combinedResponse.data().results()) {
- if
(initializeStateResult.topicId().equals(partitionKey().topicId())) {
-
Optional<InitializeShareGroupStateResponseData.PartitionResult>
partitionStateData =
-
initializeStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
- .findFirst();
-
- if (partitionStateData.isPresent()) {
- Errors error =
Errors.forCode(partitionStateData.get().errorCode());
- String errorMessage =
partitionStateData.get().errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
+ switch (clientResponseError) {
+ case NONE:
+ // response can be a combined one for large number of
requests
+ // we need to deconstruct it
+ InitializeShareGroupStateResponse combinedResponse =
(InitializeShareGroupStateResponse) response.responseBody();
+
+ for
(InitializeShareGroupStateResponseData.InitializeStateResult
initializeStateResult : combinedResponse.data().results()) {
+ if
(initializeStateResult.topicId().equals(partitionKey().topicId())) {
+
Optional<InitializeShareGroupStateResponseData.PartitionResult>
partitionStateData =
+
initializeStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ String errorMessage =
partitionStateData.get().errorMessage();
+ if (errorMessage == null ||
errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
- switch (error) {
- case NONE:
- initializeStateBackoff.resetAttempts();
-
InitializeShareGroupStateResponseData.InitializeStateResult result =
InitializeShareGroupStateResponse.toResponseInitializeStateResult(
- partitionKey().topicId(),
- List.of(partitionStateData.get())
- );
- this.result.complete(new
InitializeShareGroupStateResponse(
- new
InitializeShareGroupStateResponseData().setResults(List.of(result))));
- return;
-
- // check retriable errors
- case COORDINATOR_NOT_AVAILABLE:
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
- if (!initializeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for
initialize state RPC for key {} without success.", partitionKey());
- requestErrorResponse(error, new
Exception("Exhausted max retries to complete initialize state RPC without
success."));
- return;
+ switch (error) {
+ case NONE:
+ initializeStateBackoff.resetAttempts();
+
InitializeShareGroupStateResponseData.InitializeStateResult result =
InitializeShareGroupStateResponse.toResponseInitializeStateResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
InitializeShareGroupStateResponse(
+ new
InitializeShareGroupStateResponseData().setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
+ if
(!initializeStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries
for initialize state RPC for key {} without success.", partitionKey());
+ requestErrorResponse(error, new
Exception("Exhausted max retries to complete initialize state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(initializeStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
+ requestErrorResponse(error, new
Exception(errorMessage));
+ return;
}
- super.resetCoordinatorNode();
- timer.add(new
PersisterTimerTask(initializeStateBackoff.backOff(), this));
- return;
-
- default:
- log.error("Unable to perform initialize state
RPC for key {}: {}", partitionKey(), errorMessage);
- requestErrorResponse(error, new
Exception(errorMessage));
- return;
+ }
}
}
- }
- }
- // no response found specific topic partition
- IllegalStateException exception = new IllegalStateException(
- "Failed to initialize state for share partition: " +
partitionKey()
- );
- requestErrorResponse(Errors.forException(exception), exception);
+ // no response found specific topic partition
+ IllegalStateException exception = new
IllegalStateException(
+ "Failed to initialize state for share partition: " +
partitionKey()
+ );
+ requestErrorResponse(Errors.forException(exception),
exception);
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in initialize state
RPC client response for key {}: {}", partitionKey(),
clientResponseErrorMessage);
+ if (!initializeStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries for initialize state
RPC due to error in client response for key {}.", partitionKey());
+ requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete initialize state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(initializeStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform initialize state RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
+ requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
+ }
}
@Override
@@ -744,64 +774,85 @@ public class PersisterStateManager {
protected void handleRequestResponse(ClientResponse response) {
log.debug("Write state response received - {}", response);
writeStateBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
+ switch (clientResponseError) {
+ case NONE:
+ // response can be a combined one for large number of
requests
+ // we need to deconstruct it
+ WriteShareGroupStateResponse combinedResponse =
(WriteShareGroupStateResponse) response.responseBody();
+
+ for (WriteShareGroupStateResponseData.WriteStateResult
writeStateResult : combinedResponse.data().results()) {
+ if
(writeStateResult.topicId().equals(partitionKey().topicId())) {
+
Optional<WriteShareGroupStateResponseData.PartitionResult> partitionStateData =
+
writeStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ String errorMessage =
partitionStateData.get().errorMessage();
+ if (errorMessage == null ||
errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
- // response can be a combined one for large number of requests
- // we need to deconstruct it
- WriteShareGroupStateResponse combinedResponse =
(WriteShareGroupStateResponse) response.responseBody();
-
- for (WriteShareGroupStateResponseData.WriteStateResult
writeStateResult : combinedResponse.data().results()) {
- if
(writeStateResult.topicId().equals(partitionKey().topicId())) {
- Optional<WriteShareGroupStateResponseData.PartitionResult>
partitionStateData =
-
writeStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
- .findFirst();
-
- if (partitionStateData.isPresent()) {
- Errors error =
Errors.forCode(partitionStateData.get().errorCode());
- String errorMessage =
partitionStateData.get().errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
-
- switch (error) {
- case NONE:
- writeStateBackoff.resetAttempts();
-
WriteShareGroupStateResponseData.WriteStateResult result =
WriteShareGroupStateResponse.toResponseWriteStateResult(
- partitionKey().topicId(),
- List.of(partitionStateData.get())
- );
- this.result.complete(new
WriteShareGroupStateResponse(
- new
WriteShareGroupStateResponseData().setResults(List.of(result))));
- return;
-
- // check retriable errors
- case COORDINATOR_NOT_AVAILABLE:
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in write
state RPC for key {}: {}", partitionKey(), errorMessage);
- if (!writeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for write
state RPC for key {} without success.", partitionKey());
- requestErrorResponse(error, new
Exception("Exhausted max retries to complete write state RPC without
success."));
- return;
+ switch (error) {
+ case NONE:
+ writeStateBackoff.resetAttempts();
+
WriteShareGroupStateResponseData.WriteStateResult result =
WriteShareGroupStateResponse.toResponseWriteStateResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
WriteShareGroupStateResponse(
+ new
WriteShareGroupStateResponseData().setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in
write state RPC for key {}: {}", partitionKey(), errorMessage);
+ if (!writeStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries
for write state RPC for key {} without success.", partitionKey());
+ requestErrorResponse(error, new
Exception("Exhausted max retries to complete write state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(writeStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform write
state RPC for key {}: {}", partitionKey(), errorMessage);
+ requestErrorResponse(error, new
Exception(errorMessage));
+ return;
}
- super.resetCoordinatorNode();
- timer.add(new
PersisterTimerTask(writeStateBackoff.backOff(), this));
- return;
-
- default:
- log.error("Unable to perform write state RPC
for key {}: {}", partitionKey(), errorMessage);
- requestErrorResponse(error, new
Exception(errorMessage));
- return;
+ }
}
}
- }
- }
- // no response found specific topic partition
- IllegalStateException exception = new IllegalStateException(
- "Failed to write state for share partition: " + partitionKey()
- );
- requestErrorResponse(Errors.forException(exception), exception);
+ // no response found specific topic partition
+ IllegalStateException exception = new
IllegalStateException(
+ "Failed to write state for share partition: " +
partitionKey()
+ );
+ requestErrorResponse(Errors.forException(exception),
exception);
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in write state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+ if (!writeStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries for write state RPC
due to error in client response for key {}.", partitionKey());
+ requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete write state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(writeStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform write state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+ requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
+ }
}
@Override
@@ -895,61 +946,83 @@ public class PersisterStateManager {
protected void handleRequestResponse(ClientResponse response) {
log.debug("Read state response received - {}", response);
readStateBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
- ReadShareGroupStateResponse combinedResponse =
(ReadShareGroupStateResponse) response.responseBody();
- for (ReadShareGroupStateResponseData.ReadStateResult
readStateResult : combinedResponse.data().results()) {
- if
(readStateResult.topicId().equals(partitionKey().topicId())) {
- Optional<ReadShareGroupStateResponseData.PartitionResult>
partitionStateData =
-
readStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
- .findFirst();
-
- if (partitionStateData.isPresent()) {
- Errors error =
Errors.forCode(partitionStateData.get().errorCode());
- String errorMessage =
partitionStateData.get().errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
+ switch (clientResponseError) {
+ case NONE:
+ ReadShareGroupStateResponse combinedResponse =
(ReadShareGroupStateResponse) response.responseBody();
+ for (ReadShareGroupStateResponseData.ReadStateResult
readStateResult : combinedResponse.data().results()) {
+ if
(readStateResult.topicId().equals(partitionKey().topicId())) {
+
Optional<ReadShareGroupStateResponseData.PartitionResult> partitionStateData =
+
readStateResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ String errorMessage =
partitionStateData.get().errorMessage();
+ if (errorMessage == null ||
errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
- switch (error) {
- case NONE:
- readStateBackoff.resetAttempts();
-
ReadShareGroupStateResponseData.ReadStateResult result =
ReadShareGroupStateResponse.toResponseReadStateResult(
- partitionKey().topicId(),
- List.of(partitionStateData.get())
- );
- this.result.complete(new
ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
- .setResults(List.of(result))));
- return;
-
- // check retriable errors
- case COORDINATOR_NOT_AVAILABLE:
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in read
state RPC for key {}: {}", partitionKey(), errorMessage);
- if (!readStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for read
state RPC for key {} without success.", partitionKey());
- requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state RPC without success."));
- return;
+ switch (error) {
+ case NONE:
+ readStateBackoff.resetAttempts();
+
ReadShareGroupStateResponseData.ReadStateResult result =
ReadShareGroupStateResponse.toResponseReadStateResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
+ .setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in
read state RPC for key {}: {}", partitionKey(), errorMessage);
+ if (!readStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries
for read state RPC for key {} without success.", partitionKey());
+ requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state RPC without success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(readStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform read
state RPC for key {}: {}", partitionKey(), errorMessage);
+ requestErrorResponse(error, new
Exception(errorMessage));
+ return;
}
- super.resetCoordinatorNode();
- timer.add(new
PersisterTimerTask(readStateBackoff.backOff(), this));
- return;
-
- default:
- log.error("Unable to perform read state RPC
for key {}: {}", partitionKey(), errorMessage);
- requestErrorResponse(error, new
Exception(errorMessage));
- return;
+ }
}
}
- }
- }
- // no response found specific topic partition
- IllegalStateException exception = new IllegalStateException(
- "Failed to read state for share partition " + partitionKey()
- );
- requestErrorResponse(Errors.forException(exception), exception);
+ // no response found specific topic partition
+ IllegalStateException exception = new
IllegalStateException(
+ "Failed to read state for share partition " +
partitionKey()
+ );
+ requestErrorResponse(Errors.forException(exception),
exception);
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in read state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+ if (!readStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries for read state RPC
due to error in client response for key {}.", partitionKey());
+ requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete read state RPC without success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(readStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform read state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+ requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
+ }
}
@Override
@@ -1043,61 +1116,83 @@ public class PersisterStateManager {
protected void handleRequestResponse(ClientResponse response) {
log.debug("Read state summary response received - {}", response);
readStateSummaryBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
- ReadShareGroupStateSummaryResponse combinedResponse =
(ReadShareGroupStateSummaryResponse) response.responseBody();
- for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult
readStateSummaryResult : combinedResponse.data().results()) {
- if
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
-
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult>
partitionStateData =
-
readStateSummaryResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
- .findFirst();
-
- if (partitionStateData.isPresent()) {
- Errors error =
Errors.forCode(partitionStateData.get().errorCode());
- String errorMessage =
partitionStateData.get().errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
+ switch (clientResponseError) {
+ case NONE:
+ ReadShareGroupStateSummaryResponse combinedResponse =
(ReadShareGroupStateSummaryResponse) response.responseBody();
+ for
(ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult
readStateSummaryResult : combinedResponse.data().results()) {
+ if
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
+
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult>
partitionStateData =
+
readStateSummaryResult.partitions().stream().filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ String errorMessage =
partitionStateData.get().errorMessage();
+ if (errorMessage == null ||
errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
- switch (error) {
- case NONE:
- readStateSummaryBackoff.resetAttempts();
-
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result =
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
- partitionKey().topicId(),
- List.of(partitionStateData.get())
- );
- this.result.complete(new
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
- .setResults(List.of(result))));
- return;
-
- // check retriable errors
- case COORDINATOR_NOT_AVAILABLE:
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in read
state summary RPC for key {}: {}", partitionKey(), errorMessage);
- if (!readStateSummaryBackoff.canAttempt()) {
- log.error("Exhausted max retries for read
state summary RPC for key {} without success.", partitionKey());
- requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state summary RPC without
success."));
- return;
+ switch (error) {
+ case NONE:
+
readStateSummaryBackoff.resetAttempts();
+
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result =
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+ .setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in
read state summary RPC for key {}: {}", partitionKey(), errorMessage);
+ if
(!readStateSummaryBackoff.canAttempt()) {
+ log.error("Exhausted max retries
for read state summary RPC for key {} without success.", partitionKey());
+ requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state summary RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform read
state summary RPC for key {}: {}", partitionKey(), errorMessage);
+ requestErrorResponse(error, new
Exception(errorMessage));
+ return;
}
- super.resetCoordinatorNode();
- timer.add(new
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
- return;
-
- default:
- log.error("Unable to perform read state
summary RPC for key {}: {}", partitionKey(), errorMessage);
- requestErrorResponse(error, new
Exception(errorMessage));
- return;
+ }
}
}
- }
- }
- // no response found specific topic partition
- IllegalStateException exception = new IllegalStateException(
- "Failed to read state summary for share partition " +
partitionKey()
- );
- requestErrorResponse(Errors.forException(exception), exception);
+ // no response found specific topic partition
+ IllegalStateException exception = new
IllegalStateException(
+ "Failed to read state summary for share partition " +
partitionKey()
+ );
+ requestErrorResponse(Errors.forException(exception),
exception);
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in read state summary
RPC client response for key {}: {}", partitionKey(),
clientResponseErrorMessage);
+ if (!readStateSummaryBackoff.canAttempt()) {
+ log.error("Exhausted max retries for read state
summary RPC due to error in client response for key {}.", partitionKey());
+ requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete read state summary RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform read state summary RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
+ requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
+ }
}
@Override
@@ -1184,65 +1279,87 @@ public class PersisterStateManager {
protected void handleRequestResponse(ClientResponse response) {
log.debug("Delete state response received - {}", response);
deleteStateBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
- // response can be a combined one for large number of requests
- // we need to deconstruct it
- DeleteShareGroupStateResponse combinedResponse =
(DeleteShareGroupStateResponse) response.responseBody();
-
- for (DeleteShareGroupStateResponseData.DeleteStateResult
deleteStateResult : combinedResponse.data().results()) {
- if
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
-
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
- deleteStateResult.partitions().stream()
- .filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
- .findFirst();
-
- if (partitionStateData.isPresent()) {
- Errors error =
Errors.forCode(partitionStateData.get().errorCode());
- String errorMessage =
partitionStateData.get().errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
+ switch (clientResponseError) {
+ case NONE:
+ // response can be a combined one for large number of
requests
+ // we need to deconstruct it
+ DeleteShareGroupStateResponse combinedResponse =
(DeleteShareGroupStateResponse) response.responseBody();
+
+ for (DeleteShareGroupStateResponseData.DeleteStateResult
deleteStateResult : combinedResponse.data().results()) {
+ if
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
+
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
+ deleteStateResult.partitions().stream()
+ .filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ String errorMessage =
partitionStateData.get().errorMessage();
+ if (errorMessage == null ||
errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
- switch (error) {
- case NONE:
- deleteStateBackoff.resetAttempts();
-
DeleteShareGroupStateResponseData.DeleteStateResult result =
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
- partitionKey().topicId(),
- List.of(partitionStateData.get())
- );
- this.result.complete(new
DeleteShareGroupStateResponse(
- new
DeleteShareGroupStateResponseData().setResults(List.of(result))));
- return;
-
- // check retriable errors
- case COORDINATOR_NOT_AVAILABLE:
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in delete
state RPC for key {}: {}", partitionKey(), errorMessage);
- if (!deleteStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for
delete state RPC for key {} without success.", partitionKey());
- requestErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
- return;
+ switch (error) {
+ case NONE:
+ deleteStateBackoff.resetAttempts();
+
DeleteShareGroupStateResponseData.DeleteStateResult result =
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
DeleteShareGroupStateResponse(
+ new
DeleteShareGroupStateResponseData().setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in
delete state RPC for key {}: {}", partitionKey(), errorMessage);
+ if (!deleteStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries
for delete state RPC for key {} without success.", partitionKey());
+ requestErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform delete
state RPC for key {}: {}", partitionKey(), errorMessage);
+ requestErrorResponse(error, new
Exception(errorMessage));
+ return;
}
- super.resetCoordinatorNode();
- timer.add(new
PersisterTimerTask(deleteStateBackoff.backOff(), this));
- return;
-
- default:
- log.error("Unable to perform delete state RPC
for key {}: {}", partitionKey(), errorMessage);
- requestErrorResponse(error, new
Exception(errorMessage));
- return;
+ }
}
}
- }
- }
- // no response found specific topic partition
- IllegalStateException exception = new IllegalStateException(
- "Failed to delete state for share partition: " + partitionKey()
- );
- requestErrorResponse(Errors.forException(exception), exception);
+ // no response found specific topic partition
+ IllegalStateException exception = new
IllegalStateException(
+ "Failed to delete state for share partition: " +
partitionKey()
+ );
+ requestErrorResponse(Errors.forException(exception),
exception);
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in delete state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+ if (!deleteStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries for delete state RPC
due to error in client response for key {}.", partitionKey());
+ requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform delete state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+ requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
+ }
}
@Override
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index bdce1713817..bd4f87ae19d 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -466,7 +466,7 @@ class PersisterStateManagerTest {
}
assertEquals(Errors.NETWORK_EXCEPTION.code(),
result.data().results().get(0).partitions().get(0).errorCode());
- verify(handler, times(1)).findShareCoordinatorBuilder();
+ verify(handler, times(5)).findShareCoordinatorBuilder(); //
Retriable exception
try {
// Stopping the state manager
@@ -1373,6 +1373,77 @@ class PersisterStateManagerTest {
}
}
+ @Test
+ public void testWriteStateClientResponseErrorRetriesExhausted() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+ List<PersisterStateBatch> stateBatches = List.of(
+ new PersisterStateBatch(0, 9, (byte) 0, (short) 1),
+ new PersisterStateBatch(10, 19, (byte) 1, (short) 1)
+ );
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+ client.setUnreachable(coordinatorNode,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<WriteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.WriteStateHandler handler = spy(stateManager.new
WriteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ 0,
+ 0,
+ 0,
+ 0,
+ stateBatches,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ 2
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<WriteShareGroupStateResponse> resultFuture =
handler.result();
+
+ WriteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ WriteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NETWORK_EXCEPTION.code(),
partitionResult.errorCode());
+ verify(handler, times(2)).handleRequestResponse(any()); //
Retriable exception
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
@Test
public void testWriteStateRequestBatchingWithCoordinatorNodeLookup()
throws ExecutionException, Exception {
MockClient client = new MockClient(MOCK_TIME);
@@ -2234,6 +2305,70 @@ class PersisterStateManagerTest {
}
}
+ @Test
+ public void testReadStateClientResponseErrorRetriesExhausted() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+ client.setUnreachable(coordinatorNode,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<ReadShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.ReadStateHandler handler = spy(stateManager.new
ReadStateHandler(
+ groupId,
+ topicId,
+ partition,
+ 0,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ 2,
+ null
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<ReadShareGroupStateResponse> resultFuture =
handler.result();
+
+ ReadShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ ReadShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NETWORK_EXCEPTION.code(),
partitionResult.errorCode());
+ verify(handler, times(2)).handleRequestResponse(any()); //
Retriable exception
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
@Test
public void testReadStateSummaryRequestCoordinatorFoundSuccessfully() {
MockClient client = new MockClient(MOCK_TIME);
@@ -3004,6 +3139,70 @@ class PersisterStateManagerTest {
}
}
+ @Test
+ public void testReadStateSummaryClientResponseErrorRetriesExhausted() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+ client.setUnreachable(coordinatorNode,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<ReadShareGroupStateSummaryResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.ReadStateSummaryHandler handler =
spy(stateManager.new ReadStateSummaryHandler(
+ groupId,
+ topicId,
+ partition,
+ 0,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ 2,
+ null
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<ReadShareGroupStateSummaryResponse> resultFuture =
handler.result();
+
+ ReadShareGroupStateSummaryResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult
= result.data().results().get(0).partitions().get(0);
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NETWORK_EXCEPTION.code(),
partitionResult.errorCode());
+ verify(handler, times(2)).handleRequestResponse(any()); //
Retriable exception
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
@Test
public void testDeleteStateRequestCoordinatorFoundSuccessfully() {
MockClient client = new MockClient(MOCK_TIME);
@@ -3726,6 +3925,68 @@ class PersisterStateManagerTest {
TestUtils.waitForCondition(isBatchingSuccess::get,
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
}
+ @Test
+ public void testDeleteStateClientResponseErrorRetriesExhausted() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+ client.setUnreachable(coordinatorNode,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ 2
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ DeleteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NETWORK_EXCEPTION.code(),
partitionResult.errorCode());
+ verify(handler, times(2)).handleRequestResponse(any()); //
Retriable exception
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
@Test
public void testInitializeStateRequestCoordinatorFoundSuccessfully() {
MockClient client = new MockClient(MOCK_TIME);
@@ -4476,6 +4737,70 @@ class PersisterStateManagerTest {
TestUtils.waitForCondition(isBatchingSuccess::get,
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
}
+ @Test
+ public void testInitializeStateClientResponseErrorRetriesExhausted() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+ client.setUnreachable(coordinatorNode,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<InitializeShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.InitializeStateHandler handler =
spy(stateManager.new InitializeStateHandler(
+ groupId,
+ topicId,
+ partition,
+ 0,
+ 0L,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ 2
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<InitializeShareGroupStateResponse> resultFuture =
handler.result();
+
+ InitializeShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ InitializeShareGroupStateResponseData.PartitionResult partitionResult
= result.data().results().get(0).partitions().get(0);
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NETWORK_EXCEPTION.code(),
partitionResult.errorCode());
+ verify(handler, times(2)).handleRequestResponse(any()); //
Retriable exception
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
@Test
public void testPersisterStateManagerClose() {
KafkaClient client = mock(KafkaClient.class);
@@ -4600,7 +4925,6 @@ class PersisterStateManagerTest {
when(response.wasTimedOut()).thenReturn(holder.wasTimedOut);
when(response.authenticationException()).thenReturn(holder.authException ? new
SaslAuthenticationException("bad stuff") : null);
when(response.versionMismatch()).thenReturn(holder.versionMismatch ?
new UnsupportedVersionException("worse stuff") : null);
- assertEquals(holder.exp, handler.checkResponseError(response, (err,
exp) -> {
- }), holder.toString());
+ assertEquals(holder.exp, handler.checkResponseError(response),
holder.toString());
}
}