This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5cf9872e8f3 KAFKA-18017: Fix call order for HB error and group manager 
(#17805)
5cf9872e8f3 is described below

commit 5cf9872e8f37c5f48e1200259c1d7f784273e5a6
Author: Lianet Magrans <[email protected]>
AuthorDate: Sun Nov 17 19:12:25 2024 -0500

    KAFKA-18017: Fix call order for HB error and group manager (#17805)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../internals/AbstractHeartbeatRequestManager.java | 10 ++++---
 .../ConsumerHeartbeatRequestManagerTest.java       | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 55b4f65c848..031ccee70ae 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -155,8 +155,7 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
      */
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        if (!coordinatorRequestManager.coordinator().isPresent() ||
-                membershipManager().shouldSkipHeartbeat()) {
+        if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager().shouldSkipHeartbeat()) {
             membershipManager().onHeartbeatRequestSkipped();
             return NetworkClientDelegate.PollResult.EMPTY;
         }
@@ -305,7 +304,6 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
     private void onFailure(final Throwable exception, final long 
responseTimeMs) {
         this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
         resetHeartbeatState();
-        membershipManager().onHeartbeatFailure(exception instanceof 
RetriableException);
         if (exception instanceof RetriableException) {
             coordinatorRequestManager.handleCoordinatorDisconnect(exception, 
responseTimeMs);
             String message = String.format("%s failed because of the retriable 
exception. Will retry in %s ms: %s",
@@ -317,6 +315,8 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
             logger.error("{} failed due to fatal error: {}", 
heartbeatRequestName(), exception.getMessage());
             handleFatalFailure(exception);
         }
+        // Notify the group manager about the failure after all errors have 
been handled and propagated.
+        membershipManager().onHeartbeatFailure(exception instanceof 
RetriableException);
     }
 
     private void onResponse(final R response, final long currentTimeMs) {
@@ -336,7 +336,6 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
 
         resetHeartbeatState();
         this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
-        membershipManager().onHeartbeatFailure(false);
 
         switch (error) {
             case NOT_COORDINATOR:
@@ -415,6 +414,9 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
                 }
                 break;
         }
+
+        // Notify the group manager about the failure after all errors have 
been handled and propagated.
+        membershipManager().onHeartbeatFailure(false);
     }
 
     protected void logInfo(final String message, final R response, final long 
currentTimeMs) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 6aa9924769a..0d76fc88274 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@@ -52,6 +53,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,6 +76,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -418,6 +421,36 @@ public class ConsumerHeartbeatRequestManagerTest {
         verify(backgroundEventHandler).add(any());
     }
 
+    @Test
+    public void 
testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.GROUP_AUTHORIZATION_FAILED);
+        result.unsentRequests.get(0).handler().onComplete(response);
+
+        // The error should be propagated before notifying the group manager. 
This ensures that the app thread is aware
+        // of the HB error before the manager completes any ongoing 
unsubscribe.
+        InOrder inOrder = inOrder(backgroundEventHandler, membershipManager);
+        inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class));
+        inOrder.verify(membershipManager).onHeartbeatFailure(false);
+    }
+
+    @Test
+    public void 
testHeartbeatRequestFailureNotifiedToGroupManagerAfterErrorPropagated() {
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.GROUP_AUTHORIZATION_FAILED);
+        result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new AuthenticationException("Fatal error in HB"));
+
+        // The error should be propagated before notifying the group manager. 
This ensures that the app thread is aware
+        // of the HB error before the manager completes any ongoing 
unsubscribe.
+        InOrder inOrder = inOrder(backgroundEventHandler, membershipManager);
+        inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class));
+        inOrder.verify(membershipManager).onHeartbeatFailure(false);
+    }
+
     @Test
     public void testNoCoordinator() {
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Reply via email to