This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0292bae399 KAFKA-17463 add retry in Persister for NOT_COORDINATOR
error (#17645)
a0292bae399 is described below
commit a0292bae39941ee5ca89380744129bea1db2aa6c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Nov 4 13:06:39 2024 +0000
KAFKA-17463 add retry in Persister for NOT_COORDINATOR error (#17645)
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../share/persister/PersisterStateManager.java | 3 +
.../share/persister/PersisterStateManagerTest.java | 67 ++++++++++++++--------
2 files changed, 47 insertions(+), 23 deletions(-)
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 1f0fddf7983..45bbf52a39d 100644
---
a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -401,6 +401,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE: // retryable error codes
case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
log.warn("Received retryable error in find coordinator:
{}", error.message());
if (!findCoordBackoff.canAttempt()) {
log.error("Exhausted max retries to find coordinator
without success.");
@@ -537,6 +538,7 @@ public class PersisterStateManager {
// check retryable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
log.warn("Received retryable error in write
state RPC: {}", error.message());
if (!writeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for write
state RPC without success.");
@@ -679,6 +681,7 @@ public class PersisterStateManager {
// check retryable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
log.warn("Received retryable error in read
state RPC: {}", error.message());
if (!readStateBackoff.canAttempt()) {
log.error("Exhausted max retries for read
state RPC without success.");
diff --git
a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index edcebd0d3d2..464dd4c0517 100644
---
a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -625,7 +625,7 @@ class PersisterStateManagerTest {
}
@Test
- public void testWriteStateRequestFailButCoordinatorSuccess() {
+ public void
testWriteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws
InterruptedException, ExecutionException {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
@@ -641,6 +641,19 @@ class PersisterStateManagerTest {
String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setErrorCode(Errors.NOT_COORDINATOR.code())
+ ))
+ ),
+ suppliedNode
+ );
+
client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
@@ -672,8 +685,8 @@ class PersisterStateManagerTest {
.setPartitions(Collections.singletonList(
new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
- .setErrorCode(Errors.NOT_COORDINATOR.code())
-
.setErrorMessage(Errors.NOT_COORDINATOR.message())
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
))
))
), coordinatorNode);
@@ -708,16 +721,12 @@ class PersisterStateManagerTest {
CompletableFuture<WriteShareGroupStateResponse> resultFuture =
handler.result();
- WriteShareGroupStateResponse result = null;
- try {
- result = resultFuture.get();
- } catch (Exception e) {
- fail("Failed to get result from future", e);
- }
+ TestUtils.waitForCondition(resultFuture::isDone,
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
+ WriteShareGroupStateResponse result = resultFuture.get();
WriteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
- verify(handler, times(1)).findShareCoordinatorBuilder();
+ verify(handler, times(2)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
// Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
@@ -725,7 +734,7 @@ class PersisterStateManagerTest {
// Verifying the result returned is correct
assertEquals(partition, partitionResult.partition());
- assertEquals(Errors.NOT_COORDINATOR.code(),
partitionResult.errorCode());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
@@ -1500,7 +1509,7 @@ class PersisterStateManagerTest {
}
@Test
- public void testReadStateRequestFailButCoordinatorFoundSuccessfully() {
+ public void testReadStateRequestRetryWithNotCoordinatorSuccessfulOnRetry()
throws ExecutionException, InterruptedException {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
@@ -1512,6 +1521,19 @@ class PersisterStateManagerTest {
String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setErrorCode(Errors.NOT_COORDINATOR.code())
+ ))
+ ),
+ suppliedNode
+ );
+
client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
@@ -1543,8 +1565,11 @@ class PersisterStateManagerTest {
.setPartitions(Collections.singletonList(
new
ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
- .setErrorCode(Errors.NOT_COORDINATOR.code())
-
.setErrorMessage(Errors.NOT_COORDINATOR.message())
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ .setStateEpoch(1)
+ .setStartOffset(0)
+ .setStateBatches(Collections.emptyList())
))
))
), coordinatorNode);
@@ -1577,16 +1602,12 @@ class PersisterStateManagerTest {
CompletableFuture<ReadShareGroupStateResponse> resultFuture =
handler.result();
- ReadShareGroupStateResponse result = null;
- try {
- result = resultFuture.get();
- } catch (Exception e) {
- fail("Failed to get result from future", e);
- }
+ TestUtils.waitForCondition(resultFuture::isDone,
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
+ ReadShareGroupStateResponse result = resultFuture.get();
ReadShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
- verify(handler, times(1)).findShareCoordinatorBuilder();
+ verify(handler, times(2)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
// Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
@@ -1594,7 +1615,7 @@ class PersisterStateManagerTest {
// Verifying the result returned in correct
assertEquals(partition, partitionResult.partition());
- assertEquals(Errors.NOT_COORDINATOR.code(),
partitionResult.errorCode());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
@@ -1624,7 +1645,7 @@ class PersisterStateManagerTest {
new FindCoordinatorResponseData()
.setCoordinators(Collections.singletonList(
new FindCoordinatorResponseData.Coordinator()
-
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setErrorCode(Errors.NOT_COORDINATOR.code())
))
),
suppliedNode