This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5cf9872e8f3 KAFKA-18017: Fix call order for HB error and group manager
(#17805)
5cf9872e8f3 is described below
commit 5cf9872e8f37c5f48e1200259c1d7f784273e5a6
Author: Lianet Magrans <[email protected]>
AuthorDate: Sun Nov 17 19:12:25 2024 -0500
KAFKA-18017: Fix call order for HB error and group manager (#17805)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../internals/AbstractHeartbeatRequestManager.java | 10 ++++---
.../ConsumerHeartbeatRequestManagerTest.java | 33 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 55b4f65c848..031ccee70ae 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -155,8 +155,7 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
*/
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- if (!coordinatorRequestManager.coordinator().isPresent() ||
- membershipManager().shouldSkipHeartbeat()) {
+ if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped();
return NetworkClientDelegate.PollResult.EMPTY;
}
@@ -305,7 +304,6 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
private void onFailure(final Throwable exception, final long
responseTimeMs) {
this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
resetHeartbeatState();
- membershipManager().onHeartbeatFailure(exception instanceof
RetriableException);
if (exception instanceof RetriableException) {
coordinatorRequestManager.handleCoordinatorDisconnect(exception,
responseTimeMs);
String message = String.format("%s failed because of the retriable
exception. Will retry in %s ms: %s",
@@ -317,6 +315,8 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
logger.error("{} failed due to fatal error: {}",
heartbeatRequestName(), exception.getMessage());
handleFatalFailure(exception);
}
+ // Notify the group manager about the failure after all errors have
been handled and propagated.
+ membershipManager().onHeartbeatFailure(exception instanceof
RetriableException);
}
private void onResponse(final R response, final long currentTimeMs) {
@@ -336,7 +336,6 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
resetHeartbeatState();
this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
- membershipManager().onHeartbeatFailure(false);
switch (error) {
case NOT_COORDINATOR:
@@ -415,6 +414,9 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
}
break;
}
+
+ // Notify the group manager about the failure after all errors have
been handled and propagated.
+ membershipManager().onHeartbeatFailure(false);
}
protected void logInfo(final String message, final R response, final long
currentTimeMs) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 6aa9924769a..0d76fc88274 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@@ -52,6 +53,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import java.util.Arrays;
import java.util.Collection;
@@ -74,6 +76,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -418,6 +421,36 @@ public class ConsumerHeartbeatRequestManagerTest {
verify(backgroundEventHandler).add(any());
}
+ @Test
+ public void
testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.GROUP_AUTHORIZATION_FAILED);
+ result.unsentRequests.get(0).handler().onComplete(response);
+
+ // The error should be propagated before notifying the group manager.
This ensures that the app thread is aware
+ // of the HB error before the manager completes any ongoing
unsubscribe.
+ InOrder inOrder = inOrder(backgroundEventHandler, membershipManager);
+ inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class));
+ inOrder.verify(membershipManager).onHeartbeatFailure(false);
+ }
+
+ @Test
+ public void
testHeartbeatRequestFailureNotifiedToGroupManagerAfterErrorPropagated() {
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.GROUP_AUTHORIZATION_FAILED);
+ result.unsentRequests.get(0).handler().onFailure(time.milliseconds(),
new AuthenticationException("Fatal error in HB"));
+
+ // The error should be propagated before notifying the group manager.
This ensures that the app thread is aware
+ // of the HB error before the manager completes any ongoing
unsubscribe.
+ InOrder inOrder = inOrder(backgroundEventHandler, membershipManager);
+ inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class));
+ inOrder.verify(membershipManager).onHeartbeatFailure(false);
+ }
+
@Test
public void testNoCoordinator() {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());