This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9bc9fae9425 KAFKA-16258: callback to release assignment when stale
member leaves group (#15415)
9bc9fae9425 is described below
commit 9bc9fae9425e4dac64ef078cd3a4e7e6e09cc45a
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Feb 26 05:39:33 2024 -0500
KAFKA-16258: callback to release assignment when stale member leaves group
(#15415)
Introduce call to onPartitionsLost callback to release assignment when a
consumer pro-actively leaves the group due to poll timer expired.
When the poll timer expires, the member sends a leave group request
(reusing same existing LEAVING state and logic), and then transitions to STALE
to release it assignment and wait for the poll timer reset. Once both
conditions are met, the consumer transitions out of the STALE state to rejoin
the group. Note that while on this STALE state, the member is not part of the
group so it does not send heartbeats.
This PR also includes the fix to ensure that while STALE or in any other
state where the member is not in the group, heartbeat responses that may be
received are ignored.
Reviewers: Lucas Brutschy <[email protected]>
---
.../internals/HeartbeatRequestManager.java | 15 +-
.../clients/consumer/internals/MemberState.java | 17 +-
.../consumer/internals/MembershipManager.java | 11 +-
.../consumer/internals/MembershipManagerImpl.java | 112 +++++++++--
.../internals/HeartbeatRequestManagerTest.java | 16 +-
.../internals/MembershipManagerImplTest.java | 217 ++++++++++++++++-----
.../kafka/api/PlaintextConsumerTest.scala | 4 +-
7 files changed, 299 insertions(+), 93 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 2aabcfe20bc..550e5b92ebf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -200,13 +200,13 @@ public class HeartbeatRequestManager implements
RequestManager {
"messages. You can address this either by increasing
max.poll.interval.ms or by " +
"reducing the maximum size of batches returned in poll() with
max.poll.records.");
- // This should trigger a heartbeat with leave group epoch
- membershipManager.transitionToStale();
- NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs, true);
+ membershipManager.transitionToSendingLeaveGroup(true);
+ NetworkClientDelegate.UnsentRequest leaveHeartbeat =
makeHeartbeatRequest(currentTimeMs, true);
+
// We can ignore the leave response because we can join before or
after receiving the response.
heartbeatRequestState.reset();
heartbeatState.reset();
- return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
Collections.singletonList(request));
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
Collections.singletonList(leaveHeartbeat));
}
boolean heartbeatNow = membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight();
@@ -256,11 +256,12 @@ public class HeartbeatRequestManager implements
RequestManager {
* member to {@link MemberState#JOINING}, so that it rejoins the group.
*/
public void resetPollTimer(final long pollMs) {
+ if (pollTimer.isExpired()) {
+ logger.debug("Poll timer has been reset after it had expired");
+ membershipManager.maybeRejoinStaleMember();
+ }
pollTimer.update(pollMs);
pollTimer.reset(maxPollIntervalMs);
- if (membershipManager.state() == MemberState.STALE) {
- membershipManager.transitionToJoining();
- }
}
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
index 1df4d30e594..9f0c7d947ea 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -79,11 +79,12 @@ public enum MemberState {
FENCED,
/**
- * The member transitions to this state after a call to unsubscribe. While
in this state, the
- * member will stop sending heartbeats, will commit offsets if needed and
release its
- * assignment (calling user's callback for partitions revoked or lost).
When all these
- * actions complete, the member will transition out of this state into
{@link #LEAVING} to
- * effectively leave the group.
+ * The member transitions to this state before sending a heartbeat to
leave the group,
+ * While in this state, the member will continue sending heartbeats while
it release its
+ * assignment calling the user's callback. When callbacks complete, the
member will transition
+ * out of this state into {@link #LEAVING} to send a heartbeat to leave
the group. Note that
+ * if leaving due to expired poll timer, the member does not execute any
callbacks while in
+ * this state and just transitions to {@link #LEAVING} and then {@link
#STALE}
*/
PREPARE_LEAVING,
@@ -130,13 +131,13 @@ public enum MemberState {
JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED,
STALE);
PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE,
RECONCILING,
- ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
+ ACKNOWLEDGING, UNSUBSCRIBED);
LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
- UNSUBSCRIBED.previousValidStates = Arrays.asList(PREPARE_LEAVING,
LEAVING);
+ UNSUBSCRIBED.previousValidStates = Arrays.asList(PREPARE_LEAVING,
LEAVING, FENCED);
- STALE.previousValidStates = Arrays.asList(JOINING, RECONCILING,
ACKNOWLEDGING, STABLE);
+ STALE.previousValidStates = Arrays.asList(LEAVING);
}
private List<MemberState> previousValidStates;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index f95552cc3a3..a9c23d7b4d5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -159,10 +159,9 @@ public interface MembershipManager extends RequestManager {
void transitionToJoining();
/**
- * When the user stops polling the consumer and the
<code>max.poll.interval.ms</code> timer expires, we transition
- * the member to STALE.
+ * Transition to the {@link MemberState#LEAVING} state to send a heartbeat
to leave the group.
*/
- void transitionToStale();
+ void transitionToSendingLeaveGroup(boolean dueToPollTimerExpired);
/**
* Register a listener that will be called whenever the member state
changes due to
@@ -175,4 +174,10 @@ public interface MembershipManager extends RequestManager {
* leaving (sending last heartbeat).
*/
boolean isLeavingGroup();
+
+ /**
+ * Transition a {@link MemberState#STALE} member to {@link
MemberState#JOINING} when it completes
+ * releasing its assignment. This is expected to be used when the poll
timer is reset.
+ */
+ void maybeRejoinStaleMember();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 81e65dfd866..a9b0f3b94d8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -257,8 +257,23 @@ public class MembershipManagerImpl implements
MembershipManager {
*/
private final BackgroundEventHandler backgroundEventHandler;
+ /**
+ * Future that will complete when a stale member completes releasing its
assignment after
+ * leaving the group due to poll timer expired. Used to make sure that the
member rejoins
+ * when the timer is reset, only when it completes releasing its
assignment.
+ */
+ private CompletableFuture<Void> staleMemberAssignmentRelease;
+
private final Time time;
+ /**
+ * True if the poll timer has expired, signaled by a call to
+ * {@link #transitionToSendingLeaveGroup(boolean)} with
dueToExpiredPollTimer param true. This
+ * will be used to determine that the member should transition to STALE
after leaving the
+ * group, to release its assignment and wait for the timer to be reset.
+ */
+ private boolean isPollTimerExpired;
+
public MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
@@ -347,11 +362,17 @@ public class MembershipManagerImpl implements
MembershipManager {
);
throw new IllegalArgumentException(errorMessage);
}
+ MemberState state = state();
if (state == MemberState.LEAVING) {
log.debug("Ignoring heartbeat response received from broker.
Member {} with epoch {} is " +
"already leaving the group.", memberId, memberEpoch);
return;
}
+ if (isNotInGroup()) {
+ log.debug("Ignoring heartbeat response received from broker.
Member {} is in {} state" +
+ " so it's not a member of the group. ", memberId, state);
+ return;
+ }
// Update the group member id label in the client telemetry reporter
if the member id has
// changed. Initially the member id is empty, and it is updated when
the member joins the
@@ -382,6 +403,16 @@ public class MembershipManagerImpl implements
MembershipManager {
}
}
+ /**
+ * @return True if the consumer is not a member of the group.
+ */
+ private boolean isNotInGroup() {
+ return state == MemberState.UNSUBSCRIBED ||
+ state == MemberState.FENCED ||
+ state == MemberState.FATAL ||
+ state == MemberState.STALE;
+ }
+
/**
* This will process the assignment received if it is different from the
member's current
* assignment. If a new assignment is received, this will make sure
reconciliation is attempted
@@ -462,7 +493,12 @@ public class MembershipManagerImpl implements
MembershipManager {
" after member got fenced. Member will rejoin the
group anyways.", error);
}
updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR),
true);
- transitionToJoining();
+ if (state == MemberState.FENCED) {
+ transitionToJoining();
+ } else {
+ log.debug("Fenced member onPartitionsLost callback completed
but the state has " +
+ "already changed to {}, so the member won't rejoin the
group", state);
+ }
});
}
@@ -564,9 +600,11 @@ public class MembershipManagerImpl implements
MembershipManager {
*/
@Override
public CompletableFuture<Void> leaveGroup() {
- if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
- // Member is not part of the group. No-op and return completed
future to avoid
- // unnecessary transitions.
+ if (isNotInGroup()) {
+ if (state == MemberState.FENCED) {
+ updateSubscription(new
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
+ transitionTo(MemberState.UNSUBSCRIBED);
+ }
return CompletableFuture.completedFuture(null);
}
@@ -588,7 +626,7 @@ public class MembershipManagerImpl implements
MembershipManager {
// Transition to ensure that a heartbeat request is sent out to
effectively leave the
// group (even in the case where the member had no assignment to
release or when the
// callback execution failed.)
- transitionToSendingLeaveGroup();
+ transitionToSendingLeaveGroup(false);
});
// Return future to indicate that the leave group is done when the
callbacks
@@ -633,11 +671,15 @@ public class MembershipManagerImpl implements
MembershipManager {
/**
* Reset member epoch to the value required for the leave the group
heartbeat request, and
- * transition to the {@link MemberState#LEAVING} state so that a heartbeat
- * request is sent out with it.
- * Visible for testing.
+ * transition to the {@link MemberState#LEAVING} state so that a heartbeat
request is sent
+ * out with it.
+ *
+ * @param dueToExpiredPollTimer True if the leave group is due to an
expired poll timer. This
+ * will indicate that the member must remain
STALE after leaving,
+ * until it releases its assignment and the
timer is reset.
*/
- void transitionToSendingLeaveGroup() {
+ @Override
+ public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
if (state == MemberState.FATAL) {
log.warn("Member {} with epoch {} won't send leave group request
because it is in " +
"FATAL state", memberId, memberEpoch);
@@ -648,6 +690,14 @@ public class MembershipManagerImpl implements
MembershipManager {
memberId);
return;
}
+
+ if (dueToExpiredPollTimer) {
+ this.isPollTimerExpired = true;
+ // Briefly transition through prepare leaving. The member does not
have to release
+ // any assignment before sending the leave group given that is
stale. It will invoke
+ // onPartitionsLost after sending the leave group on the STALE
state.
+ transitionTo(MemberState.PREPARE_LEAVING);
+ }
int leaveEpoch = groupInstanceId.isPresent() ?
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
@@ -690,7 +740,11 @@ public class MembershipManagerImpl implements
MembershipManager {
transitionTo(MemberState.RECONCILING);
}
} else if (state == MemberState.LEAVING) {
- transitionToUnsubscribed();
+ if (isPollTimerExpired) {
+ transitionToStale();
+ } else {
+ transitionToUnsubscribed();
+ }
}
}
@@ -742,16 +796,37 @@ public class MembershipManagerImpl implements
MembershipManager {
return state == MemberState.PREPARE_LEAVING || state ==
MemberState.LEAVING;
}
+ @Override
+ public void maybeRejoinStaleMember() {
+ isPollTimerExpired = false;
+ if (state == MemberState.STALE) {
+ log.debug("Expired poll timer has been reset so stale member {}
will rejoin the group" +
+ "when it completes releasing its previous assignment.",
memberId);
+ staleMemberAssignmentRelease.whenComplete((__, error) ->
transitionToJoining());
+ }
+ }
+
/**
- * Sets the epoch to the leave group epoch and clears the assignments. The
member will rejoin with
- * the existing subscriptions after the next application poll event.
+ * Transition to STALE to release assignments because the member has left
the group due to
+ * expired poll timer. This will trigger the onPartitionsLost callback.
Once the callback
+ * completes, the member will remain stale until the poll timer is reset
by an application
+ * poll event. See {@link #maybeRejoinStaleMember()}.
*/
- @Override
- public void transitionToStale() {
- memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
- // Clear the current assignment and subscribed partitions before
member sending the leave group
- updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
+ private void transitionToStale() {
transitionTo(MemberState.STALE);
+
+ // Release assignment
+ CompletableFuture<Void> callbackResult =
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+ staleMemberAssignmentRelease = callbackResult.whenComplete((result,
error) -> {
+ if (error != null) {
+ log.error("onPartitionsLost callback invocation failed while
releasing assignment" +
+ " after member left group due to expired poll timer.",
error);
+ }
+ updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR),
true);
+ log.debug("Member {} sent leave group heartbeat and released its
assignment. It will remain " +
+ "in {} state until the poll timer is reset, and it will then
rejoin the group",
+ memberId, MemberState.STALE);
+ });
}
/**
@@ -1184,8 +1259,7 @@ public class MembershipManagerImpl implements
MembershipManager {
}
}
- // Visible for testing
- CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartition>
partitionsLost) {
+ private CompletableFuture<Void>
invokeOnPartitionsLostCallback(Set<TopicPartition> partitionsLost) {
// This should not trigger the callback if partitionsLost is empty, to
keep the current
// behaviour.
Optional<ConsumerRebalanceListener> listener =
subscriptions.rebalanceListener();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 72a5c0349d2..5cf5b9e2d92 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -71,6 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@@ -575,20 +576,21 @@ public class HeartbeatRequestManagerTest {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
- // On poll timer expiration, the member should transition to stale and
a last heartbeat
- // should be sent to leave the group
+ // On poll timer expiration, the member should send a last heartbeat
to leave the group
+ // and notify the membership manager
time.sleep(maxPollIntervalMs);
assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
+ verify(membershipManager).transitionToSendingLeaveGroup(true);
verify(heartbeatState).reset();
verify(heartbeatRequestState).reset();
- verify(membershipManager).transitionToStale();
+ verify(membershipManager).onHeartbeatRequestSent();
when(membershipManager.state()).thenReturn(MemberState.STALE);
when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
assertNoHeartbeat(heartbeatRequestManager);
heartbeatRequestManager.resetPollTimer(time.milliseconds());
assertTrue(pollTimer.notExpired());
- verify(membershipManager).transitionToJoining();
+ verify(membershipManager).maybeRejoinStaleMember();
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
}
@@ -604,13 +606,13 @@ public class HeartbeatRequestManagerTest {
public void
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() {
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
when(membershipManager.isLeavingGroup()).thenReturn(true);
- doNothing().when(membershipManager).transitionToStale();
time.sleep(maxPollIntervalMs);
NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
- // No transition to STALE should be triggered, because the member is
already leaving the group
- verify(membershipManager, never()).transitionToStale();
+ // No transition to leave due to stale member should be triggered,
because the member is
+ // already leaving the group
+ verify(membershipManager,
never()).transitionToSendingLeaveGroup(anyBoolean());
assertEquals(1, result.unsentRequests.size(), "A heartbeat request
should be generated to" +
" complete the ongoing leaving operation that was triggered before
the poll timer expired.");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 50f28bb5233..3294068b074 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -52,6 +55,7 @@ import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
@@ -59,6 +63,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -117,23 +122,22 @@ public class MembershipManagerImplTest {
}
private MembershipManagerImpl createMembershipManagerJoiningGroup() {
- MembershipManagerImpl manager = spy(new MembershipManagerImpl(
- GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT,
Optional.empty(),
- subscriptionState, commitRequestManager, metadata, logContext,
Optional.empty(),
- backgroundEventHandler, time));
- manager.transitionToJoining();
- return manager;
+ return createMembershipManagerJoiningGroup(null);
}
private MembershipManagerImpl createMembershipManagerJoiningGroup(String
groupInstanceId) {
- MembershipManagerImpl manager = spy(new MembershipManagerImpl(
- GROUP_ID, Optional.ofNullable(groupInstanceId),
REBALANCE_TIMEOUT, Optional.empty(),
- subscriptionState, commitRequestManager, metadata, logContext,
Optional.empty(),
- backgroundEventHandler, time));
+ MembershipManagerImpl manager =
createMembershipManager(groupInstanceId);
manager.transitionToJoining();
return manager;
}
+ private MembershipManagerImpl createMembershipManager(String
groupInstanceId) {
+ return spy(new MembershipManagerImpl(
+ GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.empty(),
+ subscriptionState, commitRequestManager, metadata, logContext,
Optional.empty(),
+ backgroundEventHandler, time));
+ }
+
private MembershipManagerImpl createMembershipManagerJoiningGroup(String
groupInstanceId,
String
serverAssignor) {
MembershipManagerImpl manager = new MembershipManagerImpl(
@@ -715,6 +719,21 @@ public class MembershipManagerImplTest {
"heartbeat request to leave is sent out.");
}
+ @ParameterizedTest
+ @MethodSource("notInGroupStates")
+ public void testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) {
+ MembershipManagerImpl membershipManager =
createMembershipManager(null);
+ when(membershipManager.state()).thenReturn(state);
+ ConsumerGroupHeartbeatResponseData responseData =
mock(ConsumerGroupHeartbeatResponseData.class);
+
+ membershipManager.onHeartbeatResponseReceived(responseData);
+
+ assertEquals(state, membershipManager.state());
+ verify(responseData, never()).memberId();
+ verify(responseData, never()).memberEpoch();
+ verify(responseData, never()).assignment();
+ }
+
@Test
public void testLeaveGroupWhenStateIsReconciling() {
MembershipManager membershipManager =
mockJoinAndReceiveAssignment(false);
@@ -803,6 +822,33 @@ public class MembershipManagerImplTest {
verify(subscriptionState,
never()).assignFromSubscribed(Collections.emptySet());
}
+ @Test
+ public void testLeaveGroupWhenMemberFenced() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+ ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent =
mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+ assertEquals(MemberState.FENCED, membershipManager.state());
+
+ mockLeaveGroup();
+ membershipManager.leaveGroup();
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+ verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+
+ completeCallback(callbackEvent, membershipManager);
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+ }
+
+ @Test
+ public void testLeaveGroupWhenMemberIsStale() {
+ MembershipManagerImpl membershipManager = mockStaleMember();
+ assertEquals(MemberState.STALE, membershipManager.state());
+
+ mockLeaveGroup();
+ CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+ assertTrue(leaveResult1.isDone());
+ assertEquals(MemberState.STALE, membershipManager.state());
+ }
+
@Test
public void testFatalFailureWhenStateIsUnjoined() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
@@ -1612,61 +1658,118 @@ public class MembershipManagerImplTest {
testOnPartitionsLost(Optional.of(new KafkaException("Intentional error
for test")));
}
+ private void
assertLeaveGroupDueToExpiredPollAndTransitionToStale(MembershipManagerImpl
membershipManager) {
+ assertDoesNotThrow(() ->
membershipManager.transitionToSendingLeaveGroup(true));
+ assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
membershipManager.memberEpoch());
+ membershipManager.onHeartbeatRequestSent();
+ assertStaleMemberLeavesGroupAndClearsAssignment(membershipManager);
+ }
+
@Test
- public void testTransitionToStaleWhileReconciling() {
+ public void testTransitionToLeavingWhileReconcilingDueToStaleMember() {
MembershipManagerImpl membershipManager = memberJoinWithAssignment();
clearInvocations(subscriptionState);
assertEquals(MemberState.RECONCILING, membershipManager.state());
-
- membershipManager.transitionToStale();
- assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
@Test
- public void testTransitionToStaleWhileJoining() {
+ public void testTransitionToLeavingWhileJoiningDueToStaleMember() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
doNothing().when(subscriptionState).assignFromSubscribed(any());
assertEquals(MemberState.JOINING, membershipManager.state());
-
- membershipManager.transitionToStale();
- assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
@Test
- public void testTransitionToStaleWhileStable() {
+ public void testTransitionToLeavingWhileStableDueToStaleMember() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
doNothing().when(subscriptionState).assignFromSubscribed(any());
assertEquals(MemberState.STABLE, membershipManager.state());
-
- membershipManager.transitionToStale();
- assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
@Test
- public void testTransitionToStaleWhileAcknowledging() {
+ public void testTransitionToLeavingWhileAcknowledgingDueToStaleMember() {
MembershipManagerImpl membershipManager =
mockJoinAndReceiveAssignment(true);
doNothing().when(subscriptionState).assignFromSubscribed(any());
clearInvocations(subscriptionState);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
-
- membershipManager.transitionToStale();
- assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
@Test
public void
testStaleMemberDoesNotSendHeartbeatAndAllowsTransitionToJoiningToRecover() {
MembershipManagerImpl membershipManager = createMemberInStableState();
doNothing().when(subscriptionState).assignFromSubscribed(any());
- membershipManager.transitionToStale();
- assertTrue(membershipManager.shouldSkipHeartbeat(), "Stale member
should not send " +
- "heartbeats");
+ membershipManager.transitionToSendingLeaveGroup(true);
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STALE, membershipManager.state());
+ assertTrue(membershipManager.shouldSkipHeartbeat(), "Stale member
should not send heartbeats");
// Check that a transition to joining is allowed, which is what is
expected to happen
// when the poll timer is reset.
- membershipManager.transitionToJoining();
+ assertDoesNotThrow(membershipManager::maybeRejoinStaleMember);
+ }
+
+ @Test
+ public void testStaleMemberRejoinsWhenTimerResetsNoCallbacks() {
+ MembershipManagerImpl membershipManager = mockStaleMember();
+ assertStaleMemberLeavesGroupAndClearsAssignment(membershipManager);
+
+ membershipManager.maybeRejoinStaleMember();
+ assertEquals(MemberState.JOINING, membershipManager.state());
}
+ @Test
+ public void testStaleMemberWaitsForCallbackToRejoinWhenTimerReset() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "topic1";
+ int ownedPartition = 0;
+ TopicPartition tp = new TopicPartition(topicName, ownedPartition);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName,
+ Collections.singletonList(new TopicIdPartition(topicId, tp)));
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+ membershipManager.transitionToSendingLeaveGroup(true);
+ membershipManager.onHeartbeatRequestSent();
+
+ assertEquals(MemberState.STALE, membershipManager.state());
+
verify(backgroundEventHandler).add(any(ConsumerRebalanceListenerCallbackNeededEvent.class));
+
+ // Stale member triggers onPartitionLost callback that will not
complete just yet
+ ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent =
performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions(topicName, ownedPartition),
+ false
+ );
+
+ // Timer reset while callback hasn't completed. Member should stay in
STALE while it
+ // completes releasing its assignment, and then transition to joining.
+ membershipManager.maybeRejoinStaleMember();
+ assertEquals(MemberState.STALE, membershipManager.state(), "Member
should not transition " +
+ "out of the STALE state when the timer is reset if the callback
has not completed.");
+ // Member should not clear its assignment to rejoin until the callback
completes
+ verify(subscriptionState, never()).assignFromSubscribed(any());
+
+ completeCallback(callbackEvent, membershipManager);
+ assertEquals(MemberState.JOINING, membershipManager.state());
+ verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ }
+
+ private MembershipManagerImpl mockStaleMember() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ doNothing().when(subscriptionState).assignFromSubscribed(any());
+ membershipManager.transitionToSendingLeaveGroup(true);
+ membershipManager.onHeartbeatRequestSent();
+ return membershipManager;
+ }
private void mockPartitionOwnedAndNewPartitionAdded(String topicName,
int partitionOwned,
int partitionAdded,
@@ -1814,30 +1917,17 @@ public class MembershipManagerImplTest {
verify(subscriptionState, never()).rebalanceListener();
}
- private void
assertStaleMemberClearsAssignmentsAndLeaves(MembershipManagerImpl
membershipManager) {
+ private void
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerImpl
membershipManager) {
assertEquals(MemberState.STALE, membershipManager.state());
- // Should clear subscriptions, current assignments, and reset epoch to
leave the group
+ // Should reset epoch to leave the group and release the assignment
(right away because
+ // there is no onPartitionsLost callback defined)
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
assertTrue(membershipManager.currentAssignment().isEmpty());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
membershipManager.memberEpoch());
}
- @Test
- public void testHeartbeatSentOnStaleMember() {
- MembershipManagerImpl membershipManager = createMemberInStableState();
- subscriptionState.subscribe(Collections.singleton("topic"),
Optional.empty());
- subscriptionState.assignFromSubscribed(Collections.singleton(new
TopicPartition("topic", 0)));
- membershipManager.transitionToStale();
- membershipManager.onHeartbeatRequestSent();
- // Member should remain in STALE state. Only when the poll timer is
reset the member will
- // transition to JOINING.
- assertEquals(MemberState.STALE, membershipManager.state());
- assertTrue(membershipManager.currentAssignment().isEmpty());
- assertTrue(subscriptionState.assignedPartitions().isEmpty());
- }
-
@Test
public void
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup(null);
@@ -2160,6 +2250,29 @@ public class MembershipManagerImplTest {
);
}
+ private ConsumerRebalanceListenerCallbackCompletedEvent
mockFencedMemberStuckOnUserCallback(
+ MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker) {
+ String topicName = "topic1";
+ TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+
+ // Fence member and block waiting for onPartitionsLost callback to
complete
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ // doNothing().when(subscriptionState).markPendingRevocation(anySet());
+ when(commitRequestManager.autoCommitEnabled()).thenReturn(false);
+ membershipManager.transitionToFenced();
+ return performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions(ownedPartition.topic(),
ownedPartition.partition()),
+ false
+ );
+ }
+
private void testStateUpdateOnFatalFailure(MembershipManagerImpl
membershipManager) {
String memberId = membershipManager.memberId();
int lastEpoch = membershipManager.memberEpoch();
@@ -2231,4 +2344,16 @@ public class MembershipManagerImplTest {
assertFalse(membershipManager.currentAssignment().isEmpty());
return membershipManager;
}
+
+ /**
+ * @return States where the member is not part of the group.
+ */
+ private static Stream<Arguments> notInGroupStates() {
+ return Stream.of(
+ Arguments.of(MemberState.UNSUBSCRIBED),
+ Arguments.of(MemberState.FENCED),
+ Arguments.of(MemberState.FATAL),
+ Arguments.of(MemberState.STALE));
+ }
+
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c826b8fd0c4..01dbb206a07 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -169,10 +169,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
startingTimestamp = startingTimestamp)
}
- // TODO: Enable this test for both protocols when the Jira tracking its
failure (KAFKA-16008) is fixed. This
- // is done by setting the @MethodSource value to
"getTestQuorumAndGroupProtocolParametersAll"
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMs(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
500.toString)