This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a4078ddde7 KAFKA-16215; KAFKA-16178; Fix member not rejoining after 
error (#15311)
6a4078ddde7 is described below

commit 6a4078ddde7888ca12582b6dff13f9ece0cf4d59
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Feb 12 02:16:57 2024 -0500

    KAFKA-16215; KAFKA-16178; Fix member not rejoining after error (#15311)
    
    This fixes a bug that was causing that members wouldn't rejoin the group 
after receiving an error in the heartbeat response (ex. fenced, not 
coordinator, as reported in KAFKA-16215 and KAFKA-16178). The issue was that 
when receiving a response with error, the response receive time was not being 
updated, so following heartbeat would be skipped, considering that there was 
already a previous one inflight.
    
    Reviewers: Andrew Schofield <[email protected]>, David Jacot 
<[email protected]>
---
 .../internals/HeartbeatRequestManager.java         | 17 +++++++++++-----
 .../internals/HeartbeatRequestManagerTest.java     | 23 +++++++++++++++++++---
 2 files changed, 32 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 03e11ddfa02..246ea05b220 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -267,11 +267,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
             return logResponse(request);
         else
             return request.whenComplete((response, exception) -> {
+                long completionTimeMs = request.handler().completionTimeMs();
                 if (response != null) {
                     
metricsManager.recordRequestLatency(response.requestLatencyMs());
-                    onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+                    onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), completionTimeMs);
                 } else {
-                    onFailure(exception, request.handler().completionTimeMs());
+                    onFailure(exception, completionTimeMs);
                 }
             });
     }
@@ -341,9 +342,8 @@ public class HeartbeatRequestManager implements 
RequestManager {
         String message;
 
         this.heartbeatState.reset();
+        this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
 
-        // TODO: upon encountering a fatal/fenced error, trigger 
onPartitionLost logic to give up the current
-        //  assignments.
         switch (error) {
             case NOT_COORDINATOR:
                 // the manager should retry immediately when the coordinator 
node becomes available again
@@ -352,6 +352,8 @@ public class HeartbeatRequestManager implements 
RequestManager {
                         coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
                 coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                // Skip backoff so that the next HB is sent as soon as the new 
coordinator is discovered
+                heartbeatRequestState.reset();
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
@@ -360,6 +362,8 @@ public class HeartbeatRequestManager implements 
RequestManager {
                         coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
                 coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                // Skip backoff so that the next HB is sent as soon as the new 
coordinator is discovered
+                heartbeatRequestState.reset();
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
@@ -368,7 +372,6 @@ public class HeartbeatRequestManager implements 
RequestManager {
                                 "Will retry",
                         coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
-                heartbeatRequestState.onFailedAttempt(currentTimeMs);
                 break;
 
             case GROUP_AUTHORIZATION_FAILED:
@@ -397,6 +400,8 @@ public class HeartbeatRequestManager implements 
RequestManager {
                         membershipManager.memberId(), 
membershipManager.memberEpoch());
                 logInfo(message, response, currentTimeMs);
                 membershipManager.transitionToFenced();
+                // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
+                heartbeatRequestState.reset();
                 break;
 
             case UNKNOWN_MEMBER_ID:
@@ -404,6 +409,8 @@ public class HeartbeatRequestManager implements 
RequestManager {
                         membershipManager.memberId());
                 logInfo(message, response, currentTimeMs);
                 membershipManager.transitionToFenced();
+                // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
+                heartbeatRequestState.reset();
                 break;
 
             default:
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 0df6fa94c39..bc014efa773 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -445,16 +445,26 @@ public class HeartbeatRequestManagerTest {
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 verify(backgroundEventHandler, never()).add(any());
-                assertEquals(DEFAULT_RETRY_BACKOFF_MS, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+                assertEquals(DEFAULT_RETRY_BACKOFF_MS,
+                    
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()), "Request should " +
+                        "backoff after receiving a coordinator load in 
progress error. ");
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 verify(backgroundEventHandler, never()).add(any());
                 
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
-                assertEquals(0, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+                assertEquals(0, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
+                    "Request should not apply backoff so that the next 
heartbeat is sent " +
+                        "as soon as the new coordinator is discovered.");
+                break;
+            case UNKNOWN_MEMBER_ID:
+            case FENCED_MEMBER_EPOCH:
+                verify(backgroundEventHandler, never()).add(any());
+                assertEquals(0, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
+                    "Request should not apply backoff so that the next 
heartbeat to rejoin is " +
+                        "sent as soon as the fenced member releases its 
assignment.");
                 break;
-
             default:
                 if (isFatal) {
                     // The memberStateManager should have stopped heartbeat at 
this point
@@ -465,6 +475,13 @@ public class HeartbeatRequestManagerTest {
                 }
                 break;
         }
+
+        if (!isFatal) {
+            // Make sure a next heartbeat is sent for all non-fatal errors (to 
retry or rejoin)
+            time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+            result = heartbeatRequestManager.poll(time.milliseconds());
+            assertEquals(1, result.unsentRequests.size());
+        }
     }
 
     @Test

Reply via email to