This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2185004083e KAFKA-16251: Fix for not sending heartbeat while fenced
(#15392)
2185004083e is described below
commit 2185004083ebb8f0b3a443132b5a33908c459c65
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Feb 23 04:56:05 2024 -0500
KAFKA-16251: Fix for not sending heartbeat while fenced (#15392)
Fix to ensure that a consumer that has been fenced by the coordinator stops
sending heartbeats while it is on the FENCED state releasing its assignment
(waiting for the onPartitionsLost callback to complete). Once the callback
completes, the member transitions to JOINING and it's then when it should
resume sending heartbeats again.
Reviewers: Lucas Brutschy <[email protected]>
---
.../consumer/internals/MembershipManagerImpl.java | 14 +++++------
.../internals/HeartbeatRequestManagerTest.java | 27 ++++++++++++++++++++++
.../internals/MembershipManagerImplTest.java | 2 ++
3 files changed, 35 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index dd035506d4b..81e65dfd866 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -720,18 +720,16 @@ public class MembershipManagerImpl implements
MembershipManager {
}
/**
- * @return True if the member should not send heartbeats, which would be
one of the following
- * cases:
- * <ul>
- * <li>Member is not subscribed to any topics</li>
- * <li>Member has received a fatal error in a previous heartbeat
response</li>
- * <li>Member is stale, meaning that it has left the group due to expired
poll timer</li>
- * </ul>
+ * @return True if the member should not send heartbeats, which is the
case when it is in a
+ * state where it is not an active member of the group.
*/
@Override
public boolean shouldSkipHeartbeat() {
MemberState state = state();
- return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL
|| state == MemberState.STALE;
+ return state == MemberState.UNSUBSCRIBED ||
+ state == MemberState.FATAL ||
+ state == MemberState.STALE ||
+ state == MemberState.FENCED;
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 4016b74b27b..72a5c0349d2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -663,6 +663,33 @@ public class HeartbeatRequestManagerTest {
assertEquals((double) randomSleepS,
getMetric("last-heartbeat-seconds-ago").metricValue());
}
+ @Test
+ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+ mockStableMember();
+
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Receive HB response fencing member
+ when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+ doNothing().when(membershipManager).transitionToFenced();
+ ClientResponse response =
createHeartbeatResponse(result.unsentRequests.get(0),
Errors.FENCED_MEMBER_EPOCH);
+ result.unsentRequests.get(0).handler().onComplete(response);
+
+ verify(membershipManager).transitionToFenced();
+ verify(heartbeatRequestState).onFailedAttempt(anyLong());
+ verify(heartbeatRequestState).reset();
+
+ when(membershipManager.state()).thenReturn(MemberState.FENCED);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result.unsentRequests.size(), "Member should not send
heartbeats while FENCED");
+
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size(), "Fenced member should
resume heartbeat after transitioning to JOINING");
+ }
+
private void assertHeartbeat(HeartbeatRequestManager hrm, int nextPollMs) {
NetworkClientDelegate.PollResult pollResult =
hrm.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index f5c65d58bfc..50f28bb5233 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -1705,6 +1705,8 @@ public class MembershipManagerImplTest {
assertEquals(0, listener.assignedCount());
assertEquals(0, listener.lostCount());
+ assertTrue(membershipManager.shouldSkipHeartbeat(), "Member should not
send heartbeat while fenced");
+
// Step 3: invoke the callback
performCallback(
membershipManager,