This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0292bae399 KAFKA-17463 add retry in Persister for NOT_COORDINATOR 
error (#17645)
a0292bae399 is described below

commit a0292bae39941ee5ca89380744129bea1db2aa6c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Nov 4 13:06:39 2024 +0000

    KAFKA-17463 add retry in Persister for NOT_COORDINATOR error (#17645)
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../share/persister/PersisterStateManager.java     |  3 +
 .../share/persister/PersisterStateManagerTest.java | 67 ++++++++++++++--------
 2 files changed, 47 insertions(+), 23 deletions(-)

diff --git 
a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 1f0fddf7983..45bbf52a39d 100644
--- 
a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -401,6 +401,7 @@ public class PersisterStateManager {
 
                 case COORDINATOR_NOT_AVAILABLE: // retryable error codes
                 case COORDINATOR_LOAD_IN_PROGRESS:
+                case NOT_COORDINATOR:
                     log.warn("Received retryable error in find coordinator: 
{}", error.message());
                     if (!findCoordBackoff.canAttempt()) {
                         log.error("Exhausted max retries to find coordinator 
without success.");
@@ -537,6 +538,7 @@ public class PersisterStateManager {
                             // check retryable errors
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
                                 log.warn("Received retryable error in write 
state RPC: {}", error.message());
                                 if (!writeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for write 
state RPC without success.");
@@ -679,6 +681,7 @@ public class PersisterStateManager {
                             // check retryable errors
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
                                 log.warn("Received retryable error in read 
state RPC: {}", error.message());
                                 if (!readStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state RPC without success.");
diff --git 
a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
 
b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index edcebd0d3d2..464dd4c0517 100644
--- 
a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++ 
b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -625,7 +625,7 @@ class PersisterStateManagerTest {
     }
 
     @Test
-    public void testWriteStateRequestFailButCoordinatorSuccess() {
+    public void 
testWriteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws 
InterruptedException, ExecutionException {
         MockClient client = new MockClient(MOCK_TIME);
 
         String groupId = "group1";
@@ -641,6 +641,19 @@ class PersisterStateManagerTest {
 
         String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, 
topicId, partition);
 
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setErrorCode(Errors.NOT_COORDINATOR.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
         client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
                 && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
                 && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
@@ -672,8 +685,8 @@ class PersisterStateManagerTest {
                         .setPartitions(Collections.singletonList(
                             new 
WriteShareGroupStateResponseData.PartitionResult()
                                 .setPartition(partition)
-                                .setErrorCode(Errors.NOT_COORDINATOR.code())
-                                
.setErrorMessage(Errors.NOT_COORDINATOR.message())
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
                         ))
                 ))
         ), coordinatorNode);
@@ -708,16 +721,12 @@ class PersisterStateManagerTest {
 
         CompletableFuture<WriteShareGroupStateResponse> resultFuture = 
handler.result();
 
-        WriteShareGroupStateResponse result = null;
-        try {
-            result = resultFuture.get();
-        } catch (Exception e) {
-            fail("Failed to get result from future", e);
-        }
+        TestUtils.waitForCondition(resultFuture::isDone, 
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
 
+        WriteShareGroupStateResponse result = resultFuture.get();
         WriteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
 
-        verify(handler, times(1)).findShareCoordinatorBuilder();
+        verify(handler, times(2)).findShareCoordinatorBuilder();
         verify(handler, times(0)).requestBuilder();
 
         // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
@@ -725,7 +734,7 @@ class PersisterStateManagerTest {
 
         // Verifying the result returned is correct
         assertEquals(partition, partitionResult.partition());
-        assertEquals(Errors.NOT_COORDINATOR.code(), 
partitionResult.errorCode());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
 
         try {
             // Stopping the state manager
@@ -1500,7 +1509,7 @@ class PersisterStateManagerTest {
     }
 
     @Test
-    public void testReadStateRequestFailButCoordinatorFoundSuccessfully() {
+    public void testReadStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() 
throws ExecutionException, InterruptedException {
         MockClient client = new MockClient(MOCK_TIME);
 
         String groupId = "group1";
@@ -1512,6 +1521,19 @@ class PersisterStateManagerTest {
 
         String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, 
topicId, partition);
 
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setErrorCode(Errors.NOT_COORDINATOR.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
         client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
                 && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
                 && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
@@ -1543,8 +1565,11 @@ class PersisterStateManagerTest {
                         .setPartitions(Collections.singletonList(
                             new 
ReadShareGroupStateResponseData.PartitionResult()
                                 .setPartition(partition)
-                                .setErrorCode(Errors.NOT_COORDINATOR.code())
-                                
.setErrorMessage(Errors.NOT_COORDINATOR.message())
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                                .setStateEpoch(1)
+                                .setStartOffset(0)
+                                .setStateBatches(Collections.emptyList())
                         ))
                 ))
         ), coordinatorNode);
@@ -1577,16 +1602,12 @@ class PersisterStateManagerTest {
 
         CompletableFuture<ReadShareGroupStateResponse> resultFuture = 
handler.result();
 
-        ReadShareGroupStateResponse result = null;
-        try {
-            result = resultFuture.get();
-        } catch (Exception e) {
-            fail("Failed to get result from future", e);
-        }
+        TestUtils.waitForCondition(resultFuture::isDone, 
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
 
+        ReadShareGroupStateResponse result = resultFuture.get();
         ReadShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
 
-        verify(handler, times(1)).findShareCoordinatorBuilder();
+        verify(handler, times(2)).findShareCoordinatorBuilder();
         verify(handler, times(0)).requestBuilder();
 
         // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
@@ -1594,7 +1615,7 @@ class PersisterStateManagerTest {
 
         // Verifying the result returned in correct
         assertEquals(partition, partitionResult.partition());
-        assertEquals(Errors.NOT_COORDINATOR.code(), 
partitionResult.errorCode());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
 
         try {
             // Stopping the state manager
@@ -1624,7 +1645,7 @@ class PersisterStateManagerTest {
                 new FindCoordinatorResponseData()
                     .setCoordinators(Collections.singletonList(
                         new FindCoordinatorResponseData.Coordinator()
-                            
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                            .setErrorCode(Errors.NOT_COORDINATOR.code())
                     ))
             ),
             suppliedNode

Reply via email to