This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aee7a3730d0 MINOR : Tolerate GroupIdNotFoundException in consumer when
leaving a group. (#21239)
aee7a3730d0 is described below
commit aee7a3730d0b689ee6a3043745eb295dad1fd59d
Author: Shivsundar R <[email protected]>
AuthorDate: Tue Jan 6 05:20:03 2026 -0500
MINOR : Tolerate GroupIdNotFoundException in consumer when leaving a group.
(#21239)
*What*
- Currently if a consumer/share-consumer calls `close()` before it has
joined a group, then the heartbeat on close will be sent with `epoch` =
-1 and the broker would return "`GroupIdNotFoundException`".
- This was causing couple of tests in `ShareConsumerTest` to be flaky if
the heartbeat to join the group was sent with `epoch` = -1.
- Since this can occur in real scenarios as well, it would be better to
tolerate this exception while we are leaving the group so that the
consumer can close cleanly.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/AbstractHeartbeatRequestManager.java | 16 +++++++
.../ConsumerHeartbeatRequestManagerTest.java | 51 ++++++++++++++++++++++
.../ShareHeartbeatRequestManagerTest.java | 51 ++++++++++++++++++++++
3 files changed, 118 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 17e2b2609c6..4f47c859114 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -432,6 +432,22 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
"subscribe. " + errorMessage));
break;
+ case GROUP_ID_NOT_FOUND:
+ // If the group doesn't exist (e.g., member never joined due
to InvalidTopicException),
+ // GROUP_ID_NOT_FOUND should be ignored - the leave is
effectively complete.
+ // When a leave heartbeat (epoch=-1) is sent, the state
transitions synchronously
+ // from LEAVING to UNSUBSCRIBED in
onHeartbeatRequestGenerated() before the request is sent.
+ if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+ logger.info("{} received GROUP_ID_NOT_FOUND for group {}
while unsubscribed. ",
+ heartbeatRequestName(),
membershipManager().groupId());
+ membershipManager().onHeartbeatRequestSkipped();
+ } else {
+ // Else, this is a fatal error, we should throw it and
transition to fatal state.
+ logger.error("{} failed due to unexpected error {}: {}",
heartbeatRequestName(), error, errorMessage);
+ handleFatalFailure(error.exception(errorMessage));
+ }
+ break;
+
default:
if (!handleSpecificExceptionInResponse(response,
currentTimeMs)) {
// If the manager receives an unknown error - there could
be a bug in the code or a new error code
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 9063ae5ab5b..3ef1d712c96 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -429,6 +429,57 @@ public class ConsumerHeartbeatRequestManagerTest {
verify(backgroundEventHandler).add(any());
}
+ /**
+ * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not a fatal
error.
+ * This can happen when the consumer never successfully joined the group
+ * (e.g., due to an InvalidTopicException during poll() and close() sends
+ * a leave heartbeat for a group that was never created.
+ */
+ @Test
+ public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
+ // Setup: member is in UNSUBSCRIBED state with epoch -1
+ when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
+ when(membershipManager.memberEpoch()).thenReturn(-1);
+
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Complete the heartbeat with GROUP_ID_NOT_FOUND error
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.GROUP_ID_NOT_FOUND);
+ result.unsentRequests.get(0).handler().onComplete(response);
+
+ // Verify: no fatal error, heartbeat skipped (benign)
+ verify(membershipManager, never()).transitionToFatal();
+ verify(membershipManager).onHeartbeatRequestSkipped();
+ verify(backgroundEventHandler, never()).add(any());
+ }
+
+ /**
+ * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal.
+ * This would indicate the group was unexpectedly deleted while the member
+ * was actively participating.
+ */
+ @Test
+ public void testGroupIdNotFoundWhileStableIsFatal() {
+ // Setup: member is in STABLE state with positive epoch
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
+ when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
+
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Complete the heartbeat with GROUP_ID_NOT_FOUND error
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.GROUP_ID_NOT_FOUND);
+ result.unsentRequests.get(0).handler().onComplete(response);
+
+ // Verify: fatal error
+ verify(membershipManager).transitionToFatal();
+ verify(backgroundEventHandler).add(any());
+ }
+
@Test
public void
testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 8952271b250..20528e775de 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -354,6 +354,57 @@ public class ShareHeartbeatRequestManagerTest {
verify(backgroundEventHandler).add(any());
}
+ /**
+ * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as
fatal.
+ * This can happen when the consumer never successfully joined the group
+ * (e.g., due to an InvalidTopicException during poll() and close() sends
+ * a leave heartbeat for a group that was never created.
+ */
+ @Test
+ public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
+ // Setup: member is in UNSUBSCRIBED state with epoch -1
+ when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
+ when(membershipManager.memberEpoch()).thenReturn(-1);
+
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Complete the heartbeat with GROUP_ID_NOT_FOUND error
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.GROUP_ID_NOT_FOUND);
+ result.unsentRequests.get(0).handler().onComplete(response);
+
+ // Verify: no fatal error, heartbeat skipped (benign)
+ verify(membershipManager, never()).transitionToFatal();
+ verify(membershipManager).onHeartbeatRequestSkipped();
+ verify(backgroundEventHandler, never()).add(any());
+ }
+
+ /**
+ * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal.
+ * This would indicate the group was unexpectedly deleted while the member
+ * was actively participating.
+ */
+ @Test
+ public void testGroupIdNotFoundWhileStableIsFatal() {
+ // Setup: member is in STABLE state with positive epoch
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
+ when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
+
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Complete the heartbeat with GROUP_ID_NOT_FOUND error
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.GROUP_ID_NOT_FOUND);
+ result.unsentRequests.get(0).handler().onComplete(response);
+
+ // Verify: fatal error
+ verify(membershipManager).transitionToFatal();
+ verify(backgroundEventHandler).add(any());
+ }
+
@Test
public void testNoCoordinator() {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());