This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48449b68fd7 KAFKA-15554: Client state changes for handling one 
assignment at a time & minor improvements (#14413)
48449b68fd7 is described below

commit 48449b68fd70c96451e03afe7d3cd5b2800240fa
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Oct 18 11:10:18 2023 -0400

    KAFKA-15554: Client state changes for handling one assignment at a time & 
minor improvements (#14413)
    
    This patch includes:
    - target assignment changes : accepting only one at a time according to the 
updated protocol.
    - changes for error handling, leaving responsibility in the 
heartbeatManager and exposing only the functionality for when the state needs 
to be updated (on successful HB, on fencing, on fatal failure)
    - allow transitions for failures when joining
    - tests & minor improvements/fixes addressing initial version review
    
    Reviewers: Kirk True <[email protected]>, Philip Nee <[email protected]>, 
David Jacot <[email protected]>
---
 .../consumer/internals/AssignorSelection.java      |  88 ---------
 .../internals/DefaultBackgroundThread.java         |   2 +-
 .../internals/HeartbeatRequestManager.java         |   2 +-
 .../clients/consumer/internals/MemberState.java    |   2 +-
 .../consumer/internals/MembershipManager.java      |  46 +++--
 .../consumer/internals/MembershipManagerImpl.java  | 212 +++++++++++++--------
 .../clients/consumer/AssignorSelectionTest.java    |  58 ------
 .../internals/HeartbeatRequestManagerTest.java     |   4 +-
 .../internals/MembershipManagerImplTest.java       | 179 +++++++++++------
 9 files changed, 296 insertions(+), 297 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
deleted file mode 100644
index 5eaae957ea5..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-import java.util.Objects;
-import java.util.Optional;
-
-/**
- * Selection of a type of assignor used by a member to get partitions assigned 
as part of a
- * consumer group. Currently supported assignors are:
- * <li>SERVER assignors</li>
- * <p/>
- * Server assignors include a name of the server assignor selected, ex. 
uniform, range.
- */
-public class AssignorSelection {
-    public enum Type { SERVER }
-
-    private final AssignorSelection.Type type;
-    private final Optional<String> serverAssignor;
-
-    private AssignorSelection(Type type, String serverAssignor) {
-        this.type = type;
-        if (type == Type.SERVER) {
-            this.serverAssignor = Optional.ofNullable(serverAssignor);
-        } else {
-            throw new IllegalArgumentException("Unsupported assignor type " + 
type);
-        }
-    }
-
-    public static AssignorSelection newServerAssignor(String serverAssignor) {
-        if (serverAssignor == null) {
-            throw new IllegalArgumentException("Selected server assignor name 
cannot be null");
-        }
-        if (serverAssignor.isEmpty()) {
-            throw new IllegalArgumentException("Selected server assignor name 
cannot be empty");
-        }
-        return new AssignorSelection(Type.SERVER, serverAssignor);
-    }
-
-    public static AssignorSelection defaultAssignor() {
-        // TODO: review default selection
-        return new AssignorSelection(Type.SERVER, null);
-    }
-
-    public Optional<String> serverAssignor() {
-        return this.serverAssignor;
-    }
-
-    public Type type() {
-        return this.type;
-    }
-
-    @Override
-    public boolean equals(Object assignorSelection) {
-        if (this == assignorSelection) return true;
-        if (assignorSelection == null || getClass() != 
assignorSelection.getClass()) return false;
-        return Objects.equals(((AssignorSelection) assignorSelection).type, 
this.type) &&
-                Objects.equals(((AssignorSelection) 
assignorSelection).serverAssignor, this.serverAssignor);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(type, serverAssignor);
-    }
-
-    @Override
-    public String toString() {
-        return "AssignorSelection(" +
-                "type=" + type + ", " +
-                "serverAssignor='" + serverAssignor + '\'' +
-                ')';
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 774f4a141b8..74322e6fec0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -194,7 +194,7 @@ public class DefaultBackgroundThread extends KafkaThread {
                         config,
                         coordinatorRequestManager,
                         groupState);
-                MembershipManager membershipManager = new 
MembershipManagerImpl(groupState.groupId);
+                MembershipManager membershipManager = new 
MembershipManagerImpl(groupState.groupId, logContext);
                 heartbeatRequestManager = new HeartbeatRequestManager(
                         this.time,
                         logContext,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 7941c4effb2..2e821693fd6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -179,7 +179,7 @@ public class HeartbeatRequestManager implements 
RequestManager {
             data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
         }
 
-        
this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor);
+        
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
 
         NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
             new ConsumerGroupHeartbeatRequest.Builder(data),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
index 4da2eb54e5c..ebcb279cf37 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -63,7 +63,7 @@ public enum MemberState {
 
         RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
 
-        FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
+        FAILED.previousValidStates = Arrays.asList(UNJOINED, STABLE, 
RECONCILING);
 
         FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index c0fb9ed3903..8a95a80c659 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -21,7 +21,8 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * A stateful object tracking the state of a single member in relationship to 
a consumer group:
+ * <p/>
  * Responsible for:
  * <li>Keeping member state</li>
  * <li>Keeping assignment for the member</li>
@@ -29,50 +30,73 @@ import java.util.Optional;
  */
 public interface MembershipManager {
 
+    /**
+     * @return Group ID of the consumer group the member is part of (or wants 
to be part of).
+     */
     String groupId();
 
+    /**
+     * @return Instance ID used by the member when joining the group. If 
non-empty, it will indicate that
+     * this is a static member.
+     */
     Optional<String> groupInstanceId();
 
+    /**
+     * @return Member ID assigned by the server to this member when it joins 
the consumer group.
+     */
     String memberId();
 
+    /**
+     * @return Current epoch of the member, maintained by the server.
+     */
     int memberEpoch();
 
+    /**
+     * @return Current state of this member in relationship to a consumer 
group, as defined in
+     * {@link MemberState}.
+     */
     MemberState state();
 
     /**
-     * Update the current state of the member based on a heartbeat response
+     * Update member info and transition member state based on a heartbeat 
response.
+     *
+     * @param response Heartbeat response to extract member info and errors 
from.
      */
     void updateState(ConsumerGroupHeartbeatResponseData response);
 
     /**
-     * Returns the {@link AssignorSelection} for the member
+     * @return Server-side assignor implementation configured for the member, 
that will be sent
+     * out to the server to be used. If empty, then the server will select the 
assignor.
      */
-    AssignorSelection assignorSelection();
+    Optional<String> serverAssignor();
 
     /**
-     * Returns the current assignment for the member
+     * @return Current assignment for the member.
      */
-    ConsumerGroupHeartbeatResponseData.Assignment assignment();
+    ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
 
     /**
      * Update the assignment for the member, indicating that the provided 
assignment is the new
      * current assignment.
      */
-    void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+    void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
 
     /**
-     * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a
-     * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+     * Transition the member to the FENCED state and update the member info as 
required. This is
+     * only invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or 
UNKNOWN_MEMBER_ID error.
+     * code.
      */
     void transitionToFenced();
 
     /**
-     * Transition the member to the FAILED state.  This is invoked when the 
heartbeat returns a non-retriable error.
+     * Transition the member to the FAILED state and update the member info as 
required. This is
+     * invoked when un-recoverable errors occur (ex. when the heartbeat 
returns a non-retriable
+     * error or when errors occur while executing the user-provided callbacks)
      */
     void transitionToFailed();
 
     /**
-     * Return true if the member should send heartbeat to the coordinator
+     * @return True if the member should send heartbeat to the coordinator.
      */
     boolean shouldSendHeartbeat();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index f4f97bb36f9..2a9a5d2992d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -19,100 +19,143 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * Group ID of the consumer group the member will be part of, provided 
when creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+     * be reset to 0 if the member gets fenced.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+     */
     private MemberState state;
-    private AssignorSelection assignorSelection;
+
+    /**
+     * Name of the server-side assignor this member has configured to use. It 
will be sent
+     * out to the server on the {@link ConsumerGroupHeartbeatRequest}. If not 
defined, the server
+     * will select the assignor implementation to use.
+     */
+    private final Optional<String> serverAssignor;
 
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
     private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+
     /**
-     * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
-     * was in process.
+     * Logger.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment;
+    private final Logger log;
 
-    public MembershipManagerImpl(String groupId) {
-        this(groupId, null, null);
+    public MembershipManagerImpl(String groupId, LogContext logContext) {
+        this(groupId, null, null, logContext);
     }
 
-    public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+    public MembershipManagerImpl(String groupId,
+                                 String groupInstanceId,
+                                 String serverAssignor,
+                                 LogContext logContext) {
         this.groupId = groupId;
         this.state = MemberState.UNJOINED;
-        if (assignorSelection == null) {
-            setAssignorSelection(AssignorSelection.defaultAssignor());
-        } else {
-            setAssignorSelection(assignorSelection);
-        }
+        this.serverAssignor = Optional.ofNullable(serverAssignor);
         this.groupInstanceId = Optional.ofNullable(groupInstanceId);
         this.targetAssignment = Optional.empty();
-        this.nextTargetAssignment = Optional.empty();
+        this.log = logContext.logger(MembershipManagerImpl.class);
     }
 
     /**
-     * Update assignor selection for the member.
+     * Update the member state, setting it to the nextState only if it is a 
valid transition.
      *
-     * @param assignorSelection New assignor selection
-     * @throws IllegalArgumentException If the provided assignor selection is 
null
+     * @throws IllegalStateException If transitioning from the member {@link 
#state} to the
+     *                               nextState is not allowed as defined in 
{@link MemberState}.
      */
-    public final void setAssignorSelection(AssignorSelection 
assignorSelection) {
-        if (assignorSelection == null) {
-            throw new IllegalArgumentException("Assignor selection cannot be 
null");
-        }
-        this.assignorSelection = assignorSelection;
-    }
-
     private void transitionTo(MemberState nextState) {
         if (!this.state.equals(nextState) && 
!nextState.getPreviousValidStates().contains(state)) {
-            // TODO: handle invalid state transition
-            throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+            throw new IllegalStateException(String.format("Invalid state 
transition from %s to %s",
                     state, nextState));
         }
+        log.trace("Member {} transitioned from {} to {}.", memberId, state, 
nextState);
         this.state = nextState;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public String groupId() {
         return groupId;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public Optional<String> groupInstanceId() {
         return groupInstanceId;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public String memberId() {
         return memberId;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int memberEpoch() {
         return memberEpoch;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void updateState(ConsumerGroupHeartbeatResponseData response) {
         if (response.errorCode() != Errors.NONE.code()) {
@@ -120,7 +163,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
                     "Unexpected error in Heartbeat response. Expected no 
error, but received: %s",
                     Errors.forCode(response.errorCode())
             );
-            throw new IllegalStateException(errorMessage);
+            throw new IllegalArgumentException(errorMessage);
         }
         this.memberId = response.memberId();
         this.memberEpoch = response.memberEpoch();
@@ -131,14 +174,21 @@ public class MembershipManagerImpl implements 
MembershipManager {
         maybeTransitionToStable();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void transitionToFailed() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
         transitionTo(MemberState.FAILED);
     }
 
@@ -160,92 +210,92 @@ public class MembershipManagerImpl implements 
MembershipManager {
         return state.equals(MemberState.STABLE);
     }
 
+    /**
+     * Take new target assignment received from the server and set it as 
targetAssignment to be
+     * processed. Following the consumer group protocol, the server won't send 
a new target
+     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+     * if a target assignment already exists.
+     *
+     * @throws IllegalStateException If a target assignment already exists.
+     */
     private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
         if (!targetAssignment.isPresent()) {
+            log.info("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
             targetAssignment = Optional.of(newTargetAssignment);
         } else {
-            // Keep the latest next target assignment
-            nextTargetAssignment = Optional.of(newTargetAssignment);
+            transitionToFailed();
+            throw new IllegalStateException("Cannot set new target assignment 
because a " +
+                    "previous one pending to be reconciled already exists.");
         }
     }
 
-    private boolean hasPendingTargetAssignment() {
-        return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-    }
-
-
     /**
-     * Update state and assignment as the member has successfully processed a 
new target
-     * assignment.
-     * This indicates the end of the reconciliation phase for the member, and 
makes the target
-     * assignment the new current assignment.
-     *
-     * @param assignment Target assignment the member was able to successfully 
process
+     * Returns true if the member has a target assignment being processed.
      */
-    public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        updateAssignment(assignment);
-        transitionTo(MemberState.STABLE);
-    }
-
-    /**
-     * Update state and member info as the member was not able to process the 
assignment, due to
-     * errors in the execution of the user-provided callbacks.
-     *
-     * @param error Exception found during the execution of the user-provided 
callbacks
-     */
-    public void onAssignmentProcessFailure(Throwable error) {
-        transitionTo(MemberState.FAILED);
-        // TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-        //  this unrecoverable state
+    private boolean hasPendingTargetAssignment() {
+        return targetAssignment.isPresent();
     }
 
     private void resetEpoch() {
         this.memberEpoch = 0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public MemberState state() {
         return state;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public AssignorSelection assignorSelection() {
-        return this.assignorSelection;
+    public Optional<String> serverAssignor() {
+        return this.serverAssignor;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+    public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
         return this.currentAssignment;
     }
 
-    // VisibleForTesting
+
+    /**
+     * @return Assignment that the member received from the server but hasn't 
completely processed
+     * yet. Visible for testing.
+     */
     Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() 
{
         return targetAssignment;
     }
 
-    // VisibleForTesting
-    Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment() {
-        return nextTargetAssignment;
-    }
-
     /**
-     * Set the current assignment for the member. This indicates that the 
reconciliation of the
-     * target assignment has been successfully completed.
-     * This will clear the {@link #targetAssignment}, and take on the
-     * {@link #nextTargetAssignment} if any.
+     * This indicates that the reconciliation of the target assignment has 
been successfully
+     * completed, so it will make it effective by assigning it to the current 
assignment.
      *
-     * @param assignment Assignment that has been successfully processed as 
part of the
-     *                   reconciliation process.
+     * @params Assignment that has been successfully reconciled. This is 
expected to
+     * match the target assignment defined in {@link #targetAssignment()}
      */
     @Override
-    public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        this.currentAssignment = assignment;
-        if (!nextTargetAssignment.isPresent()) {
-            targetAssignment = Optional.empty();
-        } else {
-            targetAssignment = Optional.of(nextTargetAssignment.get());
-            nextTargetAssignment = Optional.empty();
+    public void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
+        if (assignment == null) {
+            throw new IllegalArgumentException("Assignment cannot be null");
         }
-        maybeTransitionToStable();
+        if (!assignment.equals(targetAssignment.orElse(null))) {
+            // This could be simplified to remove the assignment param and 
just assume that what
+            // was reconciled was the targetAssignment, but keeping it 
explicit and failing fast
+            // here to uncover any issues in the interaction of the assignment 
processing logic
+            // and this.
+            throw new IllegalStateException(String.format("Reconciled 
assignment %s does not " +
+                            "match the expected target assignment %s", 
assignment,
+                    targetAssignment.orElse(null)));
+        }
+        this.currentAssignment = assignment;
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.STABLE);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
deleted file mode 100644
index 57c61e3dc14..00000000000
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.clients.consumer.internals.AssignorSelection;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class AssignorSelectionTest {
-
-    @Test
-    public void testServerAssignorCannotBeNullOrEmptyIfSelected() {
-        assertThrows(IllegalArgumentException.class,
-                () -> AssignorSelection.newServerAssignor(null));
-        assertThrows(IllegalArgumentException.class,
-                () -> AssignorSelection.newServerAssignor(""));
-    }
-
-    @Test
-    public void testEquals() {
-        // Server assignors
-        AssignorSelection selection1 = 
AssignorSelection.newServerAssignor("range");
-        AssignorSelection selection2 = 
AssignorSelection.newServerAssignor("range");
-        assertEquals(selection1, selection1);
-        assertEquals(selection1, selection2);
-        AssignorSelection selection3 = 
AssignorSelection.newServerAssignor("uniform");
-        assertNotEquals(selection1, selection3);
-        assertNotEquals(selection1, null);
-    }
-
-    @Test
-    public void testServerAssignorSelection() {
-        String assignorName = "uniform";
-        AssignorSelection selection = 
AssignorSelection.newServerAssignor(assignorName);
-        assertEquals(AssignorSelection.Type.SERVER, selection.type());
-        assertTrue(selection.serverAssignor().isPresent());
-        assertEquals(assignorName, selection.serverAssignor().get());
-    }
-}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 598deace7f5..0856b4ebcb9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -94,7 +94,7 @@ public class HeartbeatRequestManagerTest {
         coordinatorRequestManager = mock(CoordinatorRequestManager.class);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
         subscriptionState = mock(SubscriptionState.class);
-        membershipManager = spy(new MembershipManagerImpl(GROUP_ID));
+        membershipManager = spy(new MembershipManagerImpl(GROUP_ID, 
logContext));
         heartbeatRequestState = 
mock(HeartbeatRequestManager.HeartbeatRequestState.class);
         errorEventHandler = mock(ErrorEventHandler.class);
         heartbeatRequestManager = createManager();
@@ -234,7 +234,7 @@ public class HeartbeatRequestManagerTest {
         Properties prop = createConsumerConfig();
         prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000");
         config = new ConsumerConfig(prop);
-        membershipManager = new MembershipManagerImpl(GROUP_ID, 
GROUP_INSTANCE_ID, null);
+        membershipManager = new MembershipManagerImpl(GROUP_ID, 
GROUP_INSTANCE_ID, null, logContext);
         heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
             logContext,
             time,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 4b6aa80c04e..d78bbf2ab63 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -21,9 +21,12 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -35,40 +38,26 @@ public class MembershipManagerImplTest {
     private static final String GROUP_ID = "test-group";
     private static final String MEMBER_ID = "test-member-1";
     private static final int MEMBER_EPOCH = 1;
+    private final LogContext logContext = new LogContext();
 
     @Test
-    public void testMembershipManagerDefaultAssignor() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
-        assertEquals(AssignorSelection.defaultAssignor(), 
membershipManager.assignorSelection());
+    public void testMembershipManagerServerAssignor() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        assertEquals(Optional.empty(), membershipManager.serverAssignor());
 
-        membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", 
null);
-        assertEquals(AssignorSelection.defaultAssignor(), 
membershipManager.assignorSelection());
-    }
-
-    @Test
-    public void testMembershipManagerAssignorSelectionUpdate() {
-        AssignorSelection firstAssignorSelection = 
AssignorSelection.newServerAssignor("uniform");
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, "instance1",
-                firstAssignorSelection);
-        assertEquals(firstAssignorSelection, 
membershipManager.assignorSelection());
-
-        AssignorSelection secondAssignorSelection = 
AssignorSelection.newServerAssignor("range");
-        membershipManager.setAssignorSelection(secondAssignorSelection);
-        assertEquals(secondAssignorSelection, 
membershipManager.assignorSelection());
-
-        assertThrows(IllegalArgumentException.class,
-                () -> membershipManager.setAssignorSelection(null));
+        membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", 
"Uniform", logContext);
+        assertEquals(Optional.of("Uniform"), 
membershipManager.serverAssignor());
     }
 
     @Test
     public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
-        new MembershipManagerImpl(GROUP_ID);
-        new MembershipManagerImpl(GROUP_ID, null, 
AssignorSelection.defaultAssignor());
+        new MembershipManagerImpl(GROUP_ID, logContext);
+        new MembershipManagerImpl(GROUP_ID, null, null, logContext);
     }
 
     @Test
     public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
         assertEquals(MemberState.UNJOINED, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithoutAssignment =
@@ -84,7 +73,7 @@ public class MembershipManagerImplTest {
 
     @Test
     public void testMemberIdAndEpochResetOnFencedMembers() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
         membershipManager.updateState(heartbeatResponse.data());
@@ -99,7 +88,7 @@ public class MembershipManagerImplTest {
 
     @Test
     public void testTransitionToFailure() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
         membershipManager.updateState(heartbeatResponse.data());
@@ -112,62 +101,144 @@ public class MembershipManagerImplTest {
     }
 
     @Test
-    public void testUpdateAssignment() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
+    public void testFencingWhenStateIsStable() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        membershipManager.updateState(heartbeatResponse.data());
+        assertEquals(MemberState.STABLE, membershipManager.state());
+
+        testStateUpdateOnFenceError(membershipManager);
+    }
+
+    @Test
+    public void testFencingWhenStateIsReconciling() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(createAssignment());
+        membershipManager.updateState(heartbeatResponse.data());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        testStateUpdateOnFenceError(membershipManager);
+    }
+
+    @Test
+    public void testFatalFailureWhenStateIsUnjoined() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+        testStateUpdateOnFatalFailure(membershipManager);
+    }
+
+    @Test
+    public void testFatalFailureWhenStateIsStable() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        membershipManager.updateState(heartbeatResponse.data());
+        assertEquals(MemberState.STABLE, membershipManager.state());
+
+        testStateUpdateOnFatalFailure(membershipManager);
+    }
+
+    @Test
+    public void testFencingShouldNotHappenWhenStateIsUnjoined() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+        // Getting fenced when the member is not part of the group is not 
expected and should
+        // fail with invalid transition.
+        assertThrows(IllegalStateException.class, 
membershipManager::transitionToFenced);
+    }
+
+    @Test
+    public void testUpdateStateFailsOnResponsesWithErrors() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        // Updating state with a heartbeat response containing errors cannot 
be performed and
+        // should fail.
+        ConsumerGroupHeartbeatResponse unknownMemberResponse =
+                
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(IllegalArgumentException.class,
+                () -> 
membershipManager.updateState(unknownMemberResponse.data()));
+    }
+
+    @Test
+    public void testAssignmentUpdatedAsReceivedAndProcessed() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
         ConsumerGroupHeartbeatResponseData.Assignment newAssignment = 
createAssignment();
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(newAssignment);
         membershipManager.updateState(heartbeatResponse.data());
 
         // Target assignment should be in the process of being reconciled
-        checkAssignments(membershipManager, null, newAssignment, null);
+        checkAssignments(membershipManager, null, newAssignment);
+        // Mark assignment processing completed
+        membershipManager.onTargetAssignmentProcessComplete(newAssignment);
+        // Target assignment should now be the current assignment
+        checkAssignments(membershipManager, newAssignment, null);
     }
 
     @Test
-    public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess() 
{
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
+    public void 
testMemberFailsIfAssignmentReceivedWhileAnotherOnBeingReconciled() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
         ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = 
createAssignment();
         
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
 
         // First target assignment received should be in the process of being 
reconciled
-        checkAssignments(membershipManager, null, newAssignment1, null);
+        checkAssignments(membershipManager, null, newAssignment1);
 
         // Second target assignment received while there is another one being 
reconciled
         ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = 
createAssignment();
-        
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
-        checkAssignments(membershipManager, null, newAssignment1, 
newAssignment2);
+        assertThrows(IllegalStateException.class,
+                () -> 
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data()));
+        assertEquals(MemberState.FAILED, membershipManager.state());
     }
 
     @Test
-    public void 
testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
-        ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = 
createAssignment();
-        
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
-
-        // First target assignment received, remains in the process of being 
reconciled
-        checkAssignments(membershipManager, null, newAssignment1, null);
-
-        // Second target assignment received while there is another one being 
reconciled
-        ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = 
createAssignment();
-        
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
-        checkAssignments(membershipManager, null, newAssignment1, 
newAssignment2);
+    public void 
testAssignmentUpdatedFailsIfAssignmentReconciledDoesNotMatchTargetAssignment() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                .setTopicPartitions(Collections.singletonList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                                .setTopicId(Uuid.randomUuid())
+                                .setPartitions(Arrays.asList(0, 1, 2))));
+        ConsumerGroupHeartbeatResponse heartbeatResponse =
+                createConsumerGroupHeartbeatResponse(targetAssignment);
+        membershipManager.updateState(heartbeatResponse.data());
 
-        // If more assignments are received while there is one being 
reconciled, the most recent
-        // assignment received is kept as nextTargetAssignment
-        ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 = 
createAssignment();
-        
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data());
-        checkAssignments(membershipManager, null, newAssignment1, 
newAssignment3);
+        // Target assignment should be in the process of being reconciled
+        checkAssignments(membershipManager, null, targetAssignment);
+        // Mark assignment processing completed
+        ConsumerGroupHeartbeatResponseData.Assignment reconciled =
+                new ConsumerGroupHeartbeatResponseData.Assignment()
+                        .setTopicPartitions(Collections.singletonList(
+                                new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                                        .setTopicId(Uuid.randomUuid())
+                                        
.setPartitions(Collections.singletonList(0))));
+        assertThrows(IllegalStateException.class, () -> 
membershipManager.onTargetAssignmentProcessComplete(reconciled));
     }
 
     private void checkAssignments(
             MembershipManagerImpl membershipManager,
             ConsumerGroupHeartbeatResponseData.Assignment 
expectedCurrentAssignment,
-            ConsumerGroupHeartbeatResponseData.Assignment 
expectedTargetAssignment,
-            ConsumerGroupHeartbeatResponseData.Assignment 
expectedNextTargetAssignment) {
-        assertEquals(expectedCurrentAssignment, 
membershipManager.assignment());
+            ConsumerGroupHeartbeatResponseData.Assignment 
expectedTargetAssignment) {
+        assertEquals(expectedCurrentAssignment, 
membershipManager.currentAssignment());
         assertEquals(expectedTargetAssignment, 
membershipManager.targetAssignment().orElse(null));
-        assertEquals(expectedNextTargetAssignment, 
membershipManager.nextTargetAssignment().orElse(null));
+    }
 
+    private void testStateUpdateOnFenceError(MembershipManager 
membershipManager) {
+        membershipManager.transitionToFenced();
+        assertEquals(MemberState.FENCED, membershipManager.state());
+        // Should reset member epoch and keep member id
+        assertFalse(membershipManager.memberId().isEmpty());
+        assertEquals(0, membershipManager.memberEpoch());
+    }
+
+    private void testStateUpdateOnFatalFailure(MembershipManager 
membershipManager) {
+        String initialMemberId = membershipManager.memberId();
+        int initialMemberEpoch = membershipManager.memberEpoch();
+        membershipManager.transitionToFailed();
+        assertEquals(MemberState.FAILED, membershipManager.state());
+        // Should not reset member id or epoch
+        assertEquals(initialMemberId, membershipManager.memberId());
+        assertEquals(initialMemberEpoch, membershipManager.memberEpoch());
     }
 
     private ConsumerGroupHeartbeatResponse 
createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment
 assignment) {


Reply via email to