This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9bc9fae9425 KAFKA-16258: callback to release assignment when stale 
member leaves group (#15415)
9bc9fae9425 is described below

commit 9bc9fae9425e4dac64ef078cd3a4e7e6e09cc45a
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Feb 26 05:39:33 2024 -0500

    KAFKA-16258: callback to release assignment when stale member leaves group 
(#15415)
    
    Introduce call to onPartitionsLost callback to release assignment when a 
consumer pro-actively leaves the group due to poll timer expired.
    
    When the poll timer expires, the member sends a leave group request 
(reusing same existing LEAVING state and logic), and then transitions to STALE 
to release it assignment and wait for the poll timer reset. Once both 
conditions are met, the consumer transitions out of the STALE state to rejoin 
the group. Note that while on this STALE state, the member is not part of the 
group so it does not send heartbeats.
    
    This PR also includes the fix to ensure that while STALE or in any other 
state where the member is not in the group, heartbeat responses that may be 
received are ignored.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../internals/HeartbeatRequestManager.java         |  15 +-
 .../clients/consumer/internals/MemberState.java    |  17 +-
 .../consumer/internals/MembershipManager.java      |  11 +-
 .../consumer/internals/MembershipManagerImpl.java  | 112 +++++++++--
 .../internals/HeartbeatRequestManagerTest.java     |  16 +-
 .../internals/MembershipManagerImplTest.java       | 217 ++++++++++++++++-----
 .../kafka/api/PlaintextConsumerTest.scala          |   4 +-
 7 files changed, 299 insertions(+), 93 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 2aabcfe20bc..550e5b92ebf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -200,13 +200,13 @@ public class HeartbeatRequestManager implements 
RequestManager {
                 "messages. You can address this either by increasing 
max.poll.interval.ms or by " +
                 "reducing the maximum size of batches returned in poll() with 
max.poll.records.");
 
-            // This should trigger a heartbeat with leave group epoch
-            membershipManager.transitionToStale();
-            NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, true);
+            membershipManager.transitionToSendingLeaveGroup(true);
+            NetworkClientDelegate.UnsentRequest leaveHeartbeat = 
makeHeartbeatRequest(currentTimeMs, true);
+
             // We can ignore the leave response because we can join before or 
after receiving the response.
             heartbeatRequestState.reset();
             heartbeatState.reset();
-            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(leaveHeartbeat));
         }
 
         boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
@@ -256,11 +256,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
      * member to {@link MemberState#JOINING}, so that it rejoins the group.
      */
     public void resetPollTimer(final long pollMs) {
+        if (pollTimer.isExpired()) {
+            logger.debug("Poll timer has been reset after it had expired");
+            membershipManager.maybeRejoinStaleMember();
+        }
         pollTimer.update(pollMs);
         pollTimer.reset(maxPollIntervalMs);
-        if (membershipManager.state() == MemberState.STALE) {
-            membershipManager.transitionToJoining();
-        }
     }
 
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
index 1df4d30e594..9f0c7d947ea 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -79,11 +79,12 @@ public enum MemberState {
     FENCED,
 
     /**
-     * The member transitions to this state after a call to unsubscribe. While 
in this state, the
-     * member will stop sending heartbeats, will commit offsets if needed and 
release its
-     * assignment (calling user's callback for partitions revoked or lost). 
When all these
-     * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
-     * effectively leave the group.
+     * The member transitions to this state before sending a heartbeat to 
leave the group,
+     * While in this state, the member will continue sending heartbeats while 
it release its
+     * assignment calling the user's callback. When callbacks complete, the 
member will transition
+     * out of this state into {@link #LEAVING} to send a heartbeat to leave 
the group. Note that
+     * if leaving due to expired poll timer, the member does not execute any 
callbacks while in
+     * this state and just transitions to {@link #LEAVING} and then {@link 
#STALE}
      */
     PREPARE_LEAVING,
 
@@ -130,13 +131,13 @@ public enum MemberState {
         JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED, 
STALE);
 
         PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
-                ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
+                ACKNOWLEDGING, UNSUBSCRIBED);
 
         LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
 
-        UNSUBSCRIBED.previousValidStates = Arrays.asList(PREPARE_LEAVING, 
LEAVING);
+        UNSUBSCRIBED.previousValidStates = Arrays.asList(PREPARE_LEAVING, 
LEAVING, FENCED);
 
-        STALE.previousValidStates = Arrays.asList(JOINING, RECONCILING, 
ACKNOWLEDGING, STABLE);
+        STALE.previousValidStates = Arrays.asList(LEAVING);
     }
 
     private List<MemberState> previousValidStates;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index f95552cc3a3..a9c23d7b4d5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -159,10 +159,9 @@ public interface MembershipManager extends RequestManager {
     void transitionToJoining();
 
     /**
-     * When the user stops polling the consumer and the 
<code>max.poll.interval.ms</code> timer expires, we transition
-     * the member to STALE.
+     * Transition to the {@link MemberState#LEAVING} state to send a heartbeat 
to leave the group.
      */
-    void transitionToStale();
+    void transitionToSendingLeaveGroup(boolean dueToPollTimerExpired);
 
     /**
      * Register a listener that will be called whenever the member state 
changes due to
@@ -175,4 +174,10 @@ public interface MembershipManager extends RequestManager {
      * leaving (sending last heartbeat).
      */
     boolean isLeavingGroup();
+
+    /**
+     * Transition a {@link MemberState#STALE} member to {@link 
MemberState#JOINING} when it completes
+     * releasing its assignment. This is expected to be used when the poll 
timer is reset.
+     */
+    void maybeRejoinStaleMember();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 81e65dfd866..a9b0f3b94d8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -257,8 +257,23 @@ public class MembershipManagerImpl implements 
MembershipManager {
      */
     private final BackgroundEventHandler backgroundEventHandler;
 
+    /**
+     * Future that will complete when a stale member completes releasing its 
assignment after
+     * leaving the group due to poll timer expired. Used to make sure that the 
member rejoins
+     * when the timer is reset, only when it completes releasing its 
assignment.
+     */
+    private CompletableFuture<Void> staleMemberAssignmentRelease;
+
     private final Time time;
 
+    /**
+     * True if the poll timer has expired, signaled by a call to
+     * {@link #transitionToSendingLeaveGroup(boolean)} with 
dueToExpiredPollTimer param true. This
+     * will be used to determine that the member should transition to STALE 
after leaving the
+     * group, to release its assignment and wait for the timer to be reset.
+     */
+    private boolean isPollTimerExpired;
+
     public MembershipManagerImpl(String groupId,
                                  Optional<String> groupInstanceId,
                                  int rebalanceTimeoutMs,
@@ -347,11 +362,17 @@ public class MembershipManagerImpl implements 
MembershipManager {
             );
             throw new IllegalArgumentException(errorMessage);
         }
+        MemberState state = state();
         if (state == MemberState.LEAVING) {
             log.debug("Ignoring heartbeat response received from broker. 
Member {} with epoch {} is " +
                     "already leaving the group.", memberId, memberEpoch);
             return;
         }
+        if (isNotInGroup()) {
+            log.debug("Ignoring heartbeat response received from broker. 
Member {} is in {} state" +
+                " so it's not a member of the group. ", memberId, state);
+            return;
+        }
 
         // Update the group member id label in the client telemetry reporter 
if the member id has
         // changed. Initially the member id is empty, and it is updated when 
the member joins the
@@ -382,6 +403,16 @@ public class MembershipManagerImpl implements 
MembershipManager {
         }
     }
 
+    /**
+     * @return True if the consumer is not a member of the group.
+     */
+    private boolean isNotInGroup() {
+        return state == MemberState.UNSUBSCRIBED ||
+            state == MemberState.FENCED ||
+            state == MemberState.FATAL ||
+            state == MemberState.STALE;
+    }
+
     /**
      * This will process the assignment received if it is different from the 
member's current
      * assignment. If a new assignment is received, this will make sure 
reconciliation is attempted
@@ -462,7 +493,12 @@ public class MembershipManagerImpl implements 
MembershipManager {
                         " after member got fenced. Member will rejoin the 
group anyways.", error);
             }
             updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);
-            transitionToJoining();
+            if (state == MemberState.FENCED) {
+                transitionToJoining();
+            } else {
+                log.debug("Fenced member onPartitionsLost callback completed 
but the state has " +
+                    "already changed to {}, so the member won't rejoin the 
group", state);
+            }
         });
     }
 
@@ -564,9 +600,11 @@ public class MembershipManagerImpl implements 
MembershipManager {
      */
     @Override
     public CompletableFuture<Void> leaveGroup() {
-        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
-            // Member is not part of the group. No-op and return completed 
future to avoid
-            // unnecessary transitions.
+        if (isNotInGroup()) {
+            if (state == MemberState.FENCED) {
+                updateSubscription(new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
+                transitionTo(MemberState.UNSUBSCRIBED);
+            }
             return CompletableFuture.completedFuture(null);
         }
 
@@ -588,7 +626,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
             // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
             // group (even in the case where the member had no assignment to 
release or when the
             // callback execution failed.)
-            transitionToSendingLeaveGroup();
+            transitionToSendingLeaveGroup(false);
         });
 
         // Return future to indicate that the leave group is done when the 
callbacks
@@ -633,11 +671,15 @@ public class MembershipManagerImpl implements 
MembershipManager {
 
     /**
      * Reset member epoch to the value required for the leave the group 
heartbeat request, and
-     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
-     * request is sent out with it.
-     * Visible for testing.
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat 
request is sent
+     * out with it.
+     *
+     * @param dueToExpiredPollTimer True if the leave group is due to an 
expired poll timer. This
+     *                              will indicate that the member must remain 
STALE after leaving,
+     *                              until it releases its assignment and the 
timer is reset.
      */
-    void transitionToSendingLeaveGroup() {
+    @Override
+    public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
         if (state == MemberState.FATAL) {
             log.warn("Member {} with epoch {} won't send leave group request 
because it is in " +
                     "FATAL state", memberId, memberEpoch);
@@ -648,6 +690,14 @@ public class MembershipManagerImpl implements 
MembershipManager {
                 memberId);
             return;
         }
+
+        if (dueToExpiredPollTimer) {
+            this.isPollTimerExpired = true;
+            // Briefly transition through prepare leaving. The member does not 
have to release
+            // any assignment before sending the leave group given that is 
stale. It will invoke
+            // onPartitionsLost after sending the leave group on the STALE 
state.
+            transitionTo(MemberState.PREPARE_LEAVING);
+        }
         int leaveEpoch = groupInstanceId.isPresent() ?
                 ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
                 ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
@@ -690,7 +740,11 @@ public class MembershipManagerImpl implements 
MembershipManager {
                 transitionTo(MemberState.RECONCILING);
             }
         } else if (state == MemberState.LEAVING) {
-            transitionToUnsubscribed();
+            if (isPollTimerExpired) {
+                transitionToStale();
+            } else {
+                transitionToUnsubscribed();
+            }
         }
     }
 
@@ -742,16 +796,37 @@ public class MembershipManagerImpl implements 
MembershipManager {
         return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
     }
 
+    @Override
+    public void maybeRejoinStaleMember() {
+        isPollTimerExpired = false;
+        if (state == MemberState.STALE) {
+            log.debug("Expired poll timer has been reset so stale member {} 
will rejoin the group" +
+                "when it completes releasing its previous assignment.", 
memberId);
+            staleMemberAssignmentRelease.whenComplete((__, error) -> 
transitionToJoining());
+        }
+    }
+
     /**
-     * Sets the epoch to the leave group epoch and clears the assignments. The 
member will rejoin with
-     * the existing subscriptions after the next application poll event.
+     * Transition to STALE to release assignments because the member has left 
the group due to
+     * expired poll timer. This will trigger the onPartitionsLost callback. 
Once the callback
+     * completes, the member will remain stale until the poll timer is reset 
by an application
+     * poll event. See {@link #maybeRejoinStaleMember()}.
      */
-    @Override
-    public void transitionToStale() {
-        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-        // Clear the current assignment and subscribed partitions before 
member sending the leave group
-        updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
+    private void transitionToStale() {
         transitionTo(MemberState.STALE);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        staleMemberAssignmentRelease = callbackResult.whenComplete((result, 
error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                    " after member left group due to expired poll timer.", 
error);
+            }
+            updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);
+            log.debug("Member {} sent leave group heartbeat and released its 
assignment. It will remain " +
+                "in {} state until the poll timer is reset, and it will then 
rejoin the group",
+                memberId, MemberState.STALE);
+        });
     }
 
     /**
@@ -1184,8 +1259,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
         }
     }
 
-    // Visible for testing
-    CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartition> 
partitionsLost) {
+    private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartition> partitionsLost) {
         // This should not trigger the callback if partitionsLost is empty, to 
keep the current
         // behaviour.
         Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 72a5c0349d2..5cf5b9e2d92 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -71,6 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -575,20 +576,21 @@ public class HeartbeatRequestManagerTest {
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
-        // On poll timer expiration, the member should transition to stale and 
a last heartbeat 
-        // should be sent to leave the group
+        // On poll timer expiration, the member should send a last heartbeat 
to leave the group
+        // and notify the membership manager
         time.sleep(maxPollIntervalMs);
         assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
+        verify(membershipManager).transitionToSendingLeaveGroup(true);
         verify(heartbeatState).reset();
         verify(heartbeatRequestState).reset();
-        verify(membershipManager).transitionToStale();
+        verify(membershipManager).onHeartbeatRequestSent();
 
         when(membershipManager.state()).thenReturn(MemberState.STALE);
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
         assertNoHeartbeat(heartbeatRequestManager);
         heartbeatRequestManager.resetPollTimer(time.milliseconds());
         assertTrue(pollTimer.notExpired());
-        verify(membershipManager).transitionToJoining();
+        verify(membershipManager).maybeRejoinStaleMember();
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
         assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
     }
@@ -604,13 +606,13 @@ public class HeartbeatRequestManagerTest {
     public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() {
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
         when(membershipManager.isLeavingGroup()).thenReturn(true);
-        doNothing().when(membershipManager).transitionToStale();
 
         time.sleep(maxPollIntervalMs);
         NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 
-        // No transition to STALE should be triggered, because the member is 
already leaving the group
-        verify(membershipManager, never()).transitionToStale();
+        // No transition to leave due to stale member should be triggered, 
because the member is
+        // already leaving the group
+        verify(membershipManager, 
never()).transitionToSendingLeaveGroup(anyBoolean());
 
         assertEquals(1, result.unsentRequests.size(), "A heartbeat request 
should be generated to" +
             " complete the ongoing leaving operation that was triggered before 
the poll timer expired.");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 50f28bb5233..3294068b074 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.common.utils.Time;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,6 +55,7 @@ import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
@@ -59,6 +63,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.common.utils.Utils.mkSortedSet;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -117,23 +122,22 @@ public class MembershipManagerImplTest {
     }
 
     private MembershipManagerImpl createMembershipManagerJoiningGroup() {
-        MembershipManagerImpl manager = spy(new MembershipManagerImpl(
-                GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
-                subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-                backgroundEventHandler, time));
-        manager.transitionToJoining();
-        return manager;
+        return createMembershipManagerJoiningGroup(null);
     }
 
     private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId) {
-        MembershipManagerImpl manager = spy(new MembershipManagerImpl(
-                GROUP_ID, Optional.ofNullable(groupInstanceId), 
REBALANCE_TIMEOUT, Optional.empty(),
-                subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-                backgroundEventHandler, time));
+        MembershipManagerImpl manager = 
createMembershipManager(groupInstanceId);
         manager.transitionToJoining();
         return manager;
     }
 
+    private MembershipManagerImpl createMembershipManager(String 
groupInstanceId) {
+        return spy(new MembershipManagerImpl(
+            GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, 
Optional.empty(),
+            subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
+            backgroundEventHandler, time));
+    }
+
     private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId,
                                                                       String 
serverAssignor) {
         MembershipManagerImpl manager = new MembershipManagerImpl(
@@ -715,6 +719,21 @@ public class MembershipManagerImplTest {
             "heartbeat request to leave is sent out.");
     }
 
+    @ParameterizedTest
+    @MethodSource("notInGroupStates")
+    public void testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) {
+        MembershipManagerImpl membershipManager = 
createMembershipManager(null);
+        when(membershipManager.state()).thenReturn(state);
+        ConsumerGroupHeartbeatResponseData responseData = 
mock(ConsumerGroupHeartbeatResponseData.class);
+
+        membershipManager.onHeartbeatResponseReceived(responseData);
+
+        assertEquals(state, membershipManager.state());
+        verify(responseData, never()).memberId();
+        verify(responseData, never()).memberEpoch();
+        verify(responseData, never()).assignment();
+    }
+
     @Test
     public void testLeaveGroupWhenStateIsReconciling() {
         MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(false);
@@ -803,6 +822,33 @@ public class MembershipManagerImplTest {
         verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
     }
 
+    @Test
+    public void testLeaveGroupWhenMemberFenced() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+        ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent = 
mockFencedMemberStuckOnUserCallback(membershipManager, invoker);
+        assertEquals(MemberState.FENCED, membershipManager.state());
+
+        mockLeaveGroup();
+        membershipManager.leaveGroup();
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+
+        completeCallback(callbackEvent, membershipManager);
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+    }
+
+    @Test
+    public void testLeaveGroupWhenMemberIsStale() {
+        MembershipManagerImpl membershipManager = mockStaleMember();
+        assertEquals(MemberState.STALE, membershipManager.state());
+
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        assertTrue(leaveResult1.isDone());
+        assertEquals(MemberState.STALE, membershipManager.state());
+    }
+
     @Test
     public void testFatalFailureWhenStateIsUnjoined() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
@@ -1612,61 +1658,118 @@ public class MembershipManagerImplTest {
         testOnPartitionsLost(Optional.of(new KafkaException("Intentional error 
for test")));
     }
 
+    private void 
assertLeaveGroupDueToExpiredPollAndTransitionToStale(MembershipManagerImpl 
membershipManager) {
+        assertDoesNotThrow(() -> 
membershipManager.transitionToSendingLeaveGroup(true));
+        assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
+        membershipManager.onHeartbeatRequestSent();
+        assertStaleMemberLeavesGroupAndClearsAssignment(membershipManager);
+    }
+
     @Test
-    public void testTransitionToStaleWhileReconciling() {
+    public void testTransitionToLeavingWhileReconcilingDueToStaleMember() {
         MembershipManagerImpl membershipManager = memberJoinWithAssignment();
         clearInvocations(subscriptionState);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-
-        membershipManager.transitionToStale();
-        assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+        
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
     }
 
     @Test
-    public void testTransitionToStaleWhileJoining() {
+    public void testTransitionToLeavingWhileJoiningDueToStaleMember() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         doNothing().when(subscriptionState).assignFromSubscribed(any());
         assertEquals(MemberState.JOINING, membershipManager.state());
-
-        membershipManager.transitionToStale();
-        assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+        
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
     }
 
     @Test
-    public void testTransitionToStaleWhileStable() {
+    public void testTransitionToLeavingWhileStableDueToStaleMember() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
         
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         doNothing().when(subscriptionState).assignFromSubscribed(any());
         assertEquals(MemberState.STABLE, membershipManager.state());
-
-        membershipManager.transitionToStale();
-        assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+        
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
     }
 
     @Test
-    public void testTransitionToStaleWhileAcknowledging() {
+    public void testTransitionToLeavingWhileAcknowledgingDueToStaleMember() {
         MembershipManagerImpl membershipManager = 
mockJoinAndReceiveAssignment(true);
         doNothing().when(subscriptionState).assignFromSubscribed(any());
         clearInvocations(subscriptionState);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
-
-        membershipManager.transitionToStale();
-        assertStaleMemberClearsAssignmentsAndLeaves(membershipManager);
+        
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
     }
 
     @Test
     public void 
testStaleMemberDoesNotSendHeartbeatAndAllowsTransitionToJoiningToRecover() {
         MembershipManagerImpl membershipManager = createMemberInStableState();
         doNothing().when(subscriptionState).assignFromSubscribed(any());
-        membershipManager.transitionToStale();
-        assertTrue(membershipManager.shouldSkipHeartbeat(), "Stale member 
should not send " +
-            "heartbeats");
+        membershipManager.transitionToSendingLeaveGroup(true);
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STALE, membershipManager.state());
+        assertTrue(membershipManager.shouldSkipHeartbeat(), "Stale member 
should not send heartbeats");
         // Check that a transition to joining is allowed, which is what is 
expected to happen
         // when the poll timer is reset.
-        membershipManager.transitionToJoining();
+        assertDoesNotThrow(membershipManager::maybeRejoinStaleMember);
+    }
+
+    @Test
+    public void testStaleMemberRejoinsWhenTimerResetsNoCallbacks() {
+        MembershipManagerImpl membershipManager = mockStaleMember();
+        assertStaleMemberLeavesGroupAndClearsAssignment(membershipManager);
+
+        membershipManager.maybeRejoinStaleMember();
+        assertEquals(MemberState.JOINING, membershipManager.state());
     }
 
+    @Test
+    public void testStaleMemberWaitsForCallbackToRejoinWhenTimerReset() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "topic1";
+        int ownedPartition = 0;
+        TopicPartition tp = new TopicPartition(topicName, ownedPartition);
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName,
+            Collections.singletonList(new TopicIdPartition(topicId, tp)));
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+        membershipManager.transitionToSendingLeaveGroup(true);
+        membershipManager.onHeartbeatRequestSent();
+
+        assertEquals(MemberState.STALE, membershipManager.state());
+        
verify(backgroundEventHandler).add(any(ConsumerRebalanceListenerCallbackNeededEvent.class));
+
+        // Stale member triggers onPartitionLost callback that will not 
complete just yet
+        ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent = 
performCallback(
+            membershipManager,
+            invoker,
+            ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+            topicPartitions(topicName, ownedPartition),
+            false
+        );
+
+        // Timer reset while callback hasn't completed. Member should stay in 
STALE while it
+        // completes releasing its assignment, and then transition to joining.
+        membershipManager.maybeRejoinStaleMember();
+        assertEquals(MemberState.STALE, membershipManager.state(), "Member 
should not transition " +
+            "out of the STALE state when the timer is reset if the callback 
has not completed.");
+        // Member should not clear its assignment to rejoin until the callback 
completes
+        verify(subscriptionState, never()).assignFromSubscribed(any());
+
+        completeCallback(callbackEvent, membershipManager);
+        assertEquals(MemberState.JOINING, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    private MembershipManagerImpl mockStaleMember() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        doNothing().when(subscriptionState).assignFromSubscribed(any());
+        membershipManager.transitionToSendingLeaveGroup(true);
+        membershipManager.onHeartbeatRequestSent();
+        return membershipManager;
+    }
     private void mockPartitionOwnedAndNewPartitionAdded(String topicName,
                                                         int partitionOwned,
                                                         int partitionAdded,
@@ -1814,30 +1917,17 @@ public class MembershipManagerImplTest {
         verify(subscriptionState, never()).rebalanceListener();
     }
 
-    private void 
assertStaleMemberClearsAssignmentsAndLeaves(MembershipManagerImpl 
membershipManager) {
+    private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerImpl 
membershipManager) {
         assertEquals(MemberState.STALE, membershipManager.state());
 
-        // Should clear subscriptions, current assignments, and reset epoch to 
leave the group
+        // Should reset epoch to leave the group and release the assignment 
(right away because
+        // there is no onPartitionsLost callback defined)
         verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
         assertTrue(membershipManager.currentAssignment().isEmpty());
         assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
         assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
     }
 
-    @Test
-    public void testHeartbeatSentOnStaleMember() {
-        MembershipManagerImpl membershipManager = createMemberInStableState();
-        subscriptionState.subscribe(Collections.singleton("topic"), 
Optional.empty());
-        subscriptionState.assignFromSubscribed(Collections.singleton(new 
TopicPartition("topic", 0)));
-        membershipManager.transitionToStale();
-        membershipManager.onHeartbeatRequestSent();
-        // Member should remain in STALE state. Only when the poll timer is 
reset the member will
-        // transition to JOINING.
-        assertEquals(MemberState.STALE, membershipManager.state());
-        assertTrue(membershipManager.currentAssignment().isEmpty());
-        assertTrue(subscriptionState.assignedPartitions().isEmpty());
-    }
-
     @Test
     public void 
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup(null);
@@ -2160,6 +2250,29 @@ public class MembershipManagerImplTest {
         );
     }
 
+    private ConsumerRebalanceListenerCallbackCompletedEvent 
mockFencedMemberStuckOnUserCallback(
+        MembershipManagerImpl membershipManager,
+        ConsumerRebalanceListenerInvoker invoker) {
+        String topicName = "topic1";
+        TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+
+        // Fence member and block waiting for onPartitionsLost callback to 
complete
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        // doNothing().when(subscriptionState).markPendingRevocation(anySet());
+        when(commitRequestManager.autoCommitEnabled()).thenReturn(false);
+        membershipManager.transitionToFenced();
+        return performCallback(
+            membershipManager,
+            invoker,
+            ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+            topicPartitions(ownedPartition.topic(), 
ownedPartition.partition()),
+            false
+        );
+    }
+
     private void testStateUpdateOnFatalFailure(MembershipManagerImpl 
membershipManager) {
         String memberId = membershipManager.memberId();
         int lastEpoch = membershipManager.memberEpoch();
@@ -2231,4 +2344,16 @@ public class MembershipManagerImplTest {
         assertFalse(membershipManager.currentAssignment().isEmpty());
         return membershipManager;
     }
+
+    /**
+     * @return States where the member is not part of the group.
+     */
+    private static Stream<Arguments> notInGroupStates() {
+        return Stream.of(
+            Arguments.of(MemberState.UNSUBSCRIBED),
+            Arguments.of(MemberState.FENCED),
+            Arguments.of(MemberState.FATAL),
+            Arguments.of(MemberState.STALE));
+    }
+
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c826b8fd0c4..01dbb206a07 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -169,10 +169,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       startingTimestamp = startingTimestamp)
   }
 
-  // TODO: Enable this test for both protocols when the Jira tracking its 
failure (KAFKA-16008) is fixed. This
-  //       is done by setting the @MethodSource value to 
"getTestQuorumAndGroupProtocolParametersAll"
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testMaxPollIntervalMs(quorum: String, groupProtocol: String): Unit = {
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
     
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)


Reply via email to