This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48449b68fd7 KAFKA-15554: Client state changes for handling one
assignment at a time & minor improvements (#14413)
48449b68fd7 is described below
commit 48449b68fd70c96451e03afe7d3cd5b2800240fa
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Oct 18 11:10:18 2023 -0400
KAFKA-15554: Client state changes for handling one assignment at a time &
minor improvements (#14413)
This patch includes:
- target assignment changes : accepting only one at a time according to the
updated protocol.
- changes for error handling, leaving responsibility in the
heartbeatManager and exposing only the functionality for when the state needs
to be updated (on successful HB, on fencing, on fatal failure)
- allow transitions for failures when joining
- tests & minor improvements/fixes addressing initial version review
Reviewers: Kirk True <[email protected]>, Philip Nee <[email protected]>,
David Jacot <[email protected]>
---
.../consumer/internals/AssignorSelection.java | 88 ---------
.../internals/DefaultBackgroundThread.java | 2 +-
.../internals/HeartbeatRequestManager.java | 2 +-
.../clients/consumer/internals/MemberState.java | 2 +-
.../consumer/internals/MembershipManager.java | 46 +++--
.../consumer/internals/MembershipManagerImpl.java | 212 +++++++++++++--------
.../clients/consumer/AssignorSelectionTest.java | 58 ------
.../internals/HeartbeatRequestManagerTest.java | 4 +-
.../internals/MembershipManagerImplTest.java | 179 +++++++++++------
9 files changed, 296 insertions(+), 297 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
deleted file mode 100644
index 5eaae957ea5..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-import java.util.Objects;
-import java.util.Optional;
-
-/**
- * Selection of a type of assignor used by a member to get partitions assigned
as part of a
- * consumer group. Currently supported assignors are:
- * <li>SERVER assignors</li>
- * <p/>
- * Server assignors include a name of the server assignor selected, ex.
uniform, range.
- */
-public class AssignorSelection {
- public enum Type { SERVER }
-
- private final AssignorSelection.Type type;
- private final Optional<String> serverAssignor;
-
- private AssignorSelection(Type type, String serverAssignor) {
- this.type = type;
- if (type == Type.SERVER) {
- this.serverAssignor = Optional.ofNullable(serverAssignor);
- } else {
- throw new IllegalArgumentException("Unsupported assignor type " +
type);
- }
- }
-
- public static AssignorSelection newServerAssignor(String serverAssignor) {
- if (serverAssignor == null) {
- throw new IllegalArgumentException("Selected server assignor name
cannot be null");
- }
- if (serverAssignor.isEmpty()) {
- throw new IllegalArgumentException("Selected server assignor name
cannot be empty");
- }
- return new AssignorSelection(Type.SERVER, serverAssignor);
- }
-
- public static AssignorSelection defaultAssignor() {
- // TODO: review default selection
- return new AssignorSelection(Type.SERVER, null);
- }
-
- public Optional<String> serverAssignor() {
- return this.serverAssignor;
- }
-
- public Type type() {
- return this.type;
- }
-
- @Override
- public boolean equals(Object assignorSelection) {
- if (this == assignorSelection) return true;
- if (assignorSelection == null || getClass() !=
assignorSelection.getClass()) return false;
- return Objects.equals(((AssignorSelection) assignorSelection).type,
this.type) &&
- Objects.equals(((AssignorSelection)
assignorSelection).serverAssignor, this.serverAssignor);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, serverAssignor);
- }
-
- @Override
- public String toString() {
- return "AssignorSelection(" +
- "type=" + type + ", " +
- "serverAssignor='" + serverAssignor + '\'' +
- ')';
- }
-}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 774f4a141b8..74322e6fec0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -194,7 +194,7 @@ public class DefaultBackgroundThread extends KafkaThread {
config,
coordinatorRequestManager,
groupState);
- MembershipManager membershipManager = new
MembershipManagerImpl(groupState.groupId);
+ MembershipManager membershipManager = new
MembershipManagerImpl(groupState.groupId, logContext);
heartbeatRequestManager = new HeartbeatRequestManager(
this.time,
logContext,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 7941c4effb2..2e821693fd6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -179,7 +179,7 @@ public class HeartbeatRequestManager implements
RequestManager {
data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
}
-
this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor);
+
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
new ConsumerGroupHeartbeatRequest.Builder(data),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
index 4da2eb54e5c..ebcb279cf37 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -63,7 +63,7 @@ public enum MemberState {
RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
- FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
+ FAILED.previousValidStates = Arrays.asList(UNJOINED, STABLE,
RECONCILING);
FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index c0fb9ed3903..8a95a80c659 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -21,7 +21,8 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import java.util.Optional;
/**
- * Manages group membership for a single member.
+ * A stateful object tracking the state of a single member in relationship to
a consumer group:
+ * <p/>
* Responsible for:
* <li>Keeping member state</li>
* <li>Keeping assignment for the member</li>
@@ -29,50 +30,73 @@ import java.util.Optional;
*/
public interface MembershipManager {
+ /**
+ * @return Group ID of the consumer group the member is part of (or wants
to be part of).
+ */
String groupId();
+ /**
+ * @return Instance ID used by the member when joining the group. If
non-empty, it will indicate that
+ * this is a static member.
+ */
Optional<String> groupInstanceId();
+ /**
+ * @return Member ID assigned by the server to this member when it joins
the consumer group.
+ */
String memberId();
+ /**
+ * @return Current epoch of the member, maintained by the server.
+ */
int memberEpoch();
+ /**
+ * @return Current state of this member in relationship to a consumer
group, as defined in
+ * {@link MemberState}.
+ */
MemberState state();
/**
- * Update the current state of the member based on a heartbeat response
+ * Update member info and transition member state based on a heartbeat
response.
+ *
+ * @param response Heartbeat response to extract member info and errors
from.
*/
void updateState(ConsumerGroupHeartbeatResponseData response);
/**
- * Returns the {@link AssignorSelection} for the member
+ * @return Server-side assignor implementation configured for the member,
that will be sent
+ * out to the server to be used. If empty, then the server will select the
assignor.
*/
- AssignorSelection assignorSelection();
+ Optional<String> serverAssignor();
/**
- * Returns the current assignment for the member
+ * @return Current assignment for the member.
*/
- ConsumerGroupHeartbeatResponseData.Assignment assignment();
+ ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
/**
* Update the assignment for the member, indicating that the provided
assignment is the new
* current assignment.
*/
- void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment
assignment);
+ void
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment
assignment);
/**
- * Transition the member to the FENCED state. This is only invoked when
the heartbeat returns a
- * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+ * Transition the member to the FENCED state and update the member info as
required. This is
+ * only invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or
UNKNOWN_MEMBER_ID error.
+ * code.
*/
void transitionToFenced();
/**
- * Transition the member to the FAILED state. This is invoked when the
heartbeat returns a non-retriable error.
+ * Transition the member to the FAILED state and update the member info as
required. This is
+ * invoked when un-recoverable errors occur (ex. when the heartbeat
returns a non-retriable
+ * error or when errors occur while executing the user-provided callbacks)
*/
void transitionToFailed();
/**
- * Return true if the member should send heartbeat to the coordinator
+ * @return True if the member should send heartbeat to the coordinator.
*/
boolean shouldSendHeartbeat();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index f4f97bb36f9..2a9a5d2992d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -19,100 +19,143 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
import java.util.Optional;
/**
- * Membership manager that maintains group membership for a single member
following the new
+ * Membership manager that maintains group membership for a single member,
following the new
* consumer group protocol.
* <p/>
- * This keeps membership state and assignment updated in-memory, based on the
heartbeat responses
- * the member receives. It is also responsible for computing assignment for
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the
member receives.
*/
public class MembershipManagerImpl implements MembershipManager {
+ /**
+ * Group ID of the consumer group the member will be part of, provided
when creating the current
+ * membership manager.
+ */
private final String groupId;
+
+ /**
+ * Group instance ID to be used by the member, provided when creating the
current membership manager.
+ */
private final Optional<String> groupInstanceId;
+
+ /**
+ * Member ID assigned by the server to the member, received in a heartbeat
response when
+ * joining the group specified in {@link #groupId}
+ */
private String memberId;
+
+ /**
+ * Current epoch of the member. It will be set to 0 by the member, and
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained
by the server,
+ * incremented as the member reconciles and acknowledges the assignments
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
private int memberEpoch;
+
+ /**
+ * Current state of this member as part of the consumer group, as defined
in {@link MemberState}
+ */
private MemberState state;
- private AssignorSelection assignorSelection;
+
+ /**
+ * Name of the server-side assignor this member has configured to use. It
will be sent
+ * out to the server on the {@link ConsumerGroupHeartbeatRequest}. If not
defined, the server
+ * will select the assignor implementation to use.
+ */
+ private final Optional<String> serverAssignor;
/**
* Assignment that the member received from the server and successfully
processed.
*/
private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
/**
* Assignment that the member received from the server but hasn't
completely processed
* yet.
*/
private Optional<ConsumerGroupHeartbeatResponseData.Assignment>
targetAssignment;
+
/**
- * Latest assignment that the member received from the server while a
{@link #targetAssignment}
- * was in process.
+ * Logger.
*/
- private Optional<ConsumerGroupHeartbeatResponseData.Assignment>
nextTargetAssignment;
+ private final Logger log;
- public MembershipManagerImpl(String groupId) {
- this(groupId, null, null);
+ public MembershipManagerImpl(String groupId, LogContext logContext) {
+ this(groupId, null, null, logContext);
}
- public MembershipManagerImpl(String groupId, String groupInstanceId,
AssignorSelection assignorSelection) {
+ public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ String serverAssignor,
+ LogContext logContext) {
this.groupId = groupId;
this.state = MemberState.UNJOINED;
- if (assignorSelection == null) {
- setAssignorSelection(AssignorSelection.defaultAssignor());
- } else {
- setAssignorSelection(assignorSelection);
- }
+ this.serverAssignor = Optional.ofNullable(serverAssignor);
this.groupInstanceId = Optional.ofNullable(groupInstanceId);
this.targetAssignment = Optional.empty();
- this.nextTargetAssignment = Optional.empty();
+ this.log = logContext.logger(MembershipManagerImpl.class);
}
/**
- * Update assignor selection for the member.
+ * Update the member state, setting it to the nextState only if it is a
valid transition.
*
- * @param assignorSelection New assignor selection
- * @throws IllegalArgumentException If the provided assignor selection is
null
+ * @throws IllegalStateException If transitioning from the member {@link
#state} to the
+ * nextState is not allowed as defined in
{@link MemberState}.
*/
- public final void setAssignorSelection(AssignorSelection
assignorSelection) {
- if (assignorSelection == null) {
- throw new IllegalArgumentException("Assignor selection cannot be
null");
- }
- this.assignorSelection = assignorSelection;
- }
-
private void transitionTo(MemberState nextState) {
if (!this.state.equals(nextState) &&
!nextState.getPreviousValidStates().contains(state)) {
- // TODO: handle invalid state transition
- throw new RuntimeException(String.format("Invalid state transition
from %s to %s",
+ throw new IllegalStateException(String.format("Invalid state
transition from %s to %s",
state, nextState));
}
+ log.trace("Member {} transitioned from {} to {}.", memberId, state,
nextState);
this.state = nextState;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public String groupId() {
return groupId;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public String memberId() {
return memberId;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int memberEpoch() {
return memberEpoch;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void updateState(ConsumerGroupHeartbeatResponseData response) {
if (response.errorCode() != Errors.NONE.code()) {
@@ -120,7 +163,7 @@ public class MembershipManagerImpl implements
MembershipManager {
"Unexpected error in Heartbeat response. Expected no
error, but received: %s",
Errors.forCode(response.errorCode())
);
- throw new IllegalStateException(errorMessage);
+ throw new IllegalArgumentException(errorMessage);
}
this.memberId = response.memberId();
this.memberEpoch = response.memberEpoch();
@@ -131,14 +174,21 @@ public class MembershipManagerImpl implements
MembershipManager {
maybeTransitionToStable();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void transitionToFenced() {
resetEpoch();
transitionTo(MemberState.FENCED);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void transitionToFailed() {
+ log.error("Member {} transitioned to {} state", memberId,
MemberState.FAILED);
transitionTo(MemberState.FAILED);
}
@@ -160,92 +210,92 @@ public class MembershipManagerImpl implements
MembershipManager {
return state.equals(MemberState.STABLE);
}
+ /**
+ * Take new target assignment received from the server and set it as
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send
a new target
+ * member while a previous one hasn't been acknowledged by the member, so
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
private void
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment
newTargetAssignment) {
if (!targetAssignment.isPresent()) {
+ log.info("Member {} accepted new target assignment {} to
reconcile", memberId, newTargetAssignment);
targetAssignment = Optional.of(newTargetAssignment);
} else {
- // Keep the latest next target assignment
- nextTargetAssignment = Optional.of(newTargetAssignment);
+ transitionToFailed();
+ throw new IllegalStateException("Cannot set new target assignment
because a " +
+ "previous one pending to be reconciled already exists.");
}
}
- private boolean hasPendingTargetAssignment() {
- return targetAssignment.isPresent() ||
nextTargetAssignment.isPresent();
- }
-
-
/**
- * Update state and assignment as the member has successfully processed a
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully
process
+ * Returns true if the member has a target assignment being processed.
*/
- public void
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment
assignment) {
- updateAssignment(assignment);
- transitionTo(MemberState.STABLE);
- }
-
- /**
- * Update state and member info as the member was not able to process the
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided
callbacks
- */
- public void onAssignmentProcessFailure(Throwable error) {
- transitionTo(MemberState.FAILED);
- // TODO: update member info appropriately, to clear up whatever
shouldn't be kept in
- // this unrecoverable state
+ private boolean hasPendingTargetAssignment() {
+ return targetAssignment.isPresent();
}
private void resetEpoch() {
this.memberEpoch = 0;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public MemberState state() {
return state;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public AssignorSelection assignorSelection() {
- return this.assignorSelection;
+ public Optional<String> serverAssignor() {
+ return this.serverAssignor;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+ public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
return this.currentAssignment;
}
- // VisibleForTesting
+
+ /**
+ * @return Assignment that the member received from the server but hasn't
completely processed
+ * yet. Visible for testing.
+ */
Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment()
{
return targetAssignment;
}
- // VisibleForTesting
- Optional<ConsumerGroupHeartbeatResponseData.Assignment>
nextTargetAssignment() {
- return nextTargetAssignment;
- }
-
/**
- * Set the current assignment for the member. This indicates that the
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has
been successfully
+ * completed, so it will make it effective by assigning it to the current
assignment.
*
- * @param assignment Assignment that has been successfully processed as
part of the
- * reconciliation process.
+ * @params Assignment that has been successfully reconciled. This is
expected to
+ * match the target assignment defined in {@link #targetAssignment()}
*/
@Override
- public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment
assignment) {
- this.currentAssignment = assignment;
- if (!nextTargetAssignment.isPresent()) {
- targetAssignment = Optional.empty();
- } else {
- targetAssignment = Optional.of(nextTargetAssignment.get());
- nextTargetAssignment = Optional.empty();
+ public void
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment
assignment) {
+ if (assignment == null) {
+ throw new IllegalArgumentException("Assignment cannot be null");
}
- maybeTransitionToStable();
+ if (!assignment.equals(targetAssignment.orElse(null))) {
+ // This could be simplified to remove the assignment param and
just assume that what
+ // was reconciled was the targetAssignment, but keeping it
explicit and failing fast
+ // here to uncover any issues in the interaction of the assignment
processing logic
+ // and this.
+ throw new IllegalStateException(String.format("Reconciled
assignment %s does not " +
+ "match the expected target assignment %s",
assignment,
+ targetAssignment.orElse(null)));
+ }
+ this.currentAssignment = assignment;
+ targetAssignment = Optional.empty();
+ transitionTo(MemberState.STABLE);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
deleted file mode 100644
index 57c61e3dc14..00000000000
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.clients.consumer.internals.AssignorSelection;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class AssignorSelectionTest {
-
- @Test
- public void testServerAssignorCannotBeNullOrEmptyIfSelected() {
- assertThrows(IllegalArgumentException.class,
- () -> AssignorSelection.newServerAssignor(null));
- assertThrows(IllegalArgumentException.class,
- () -> AssignorSelection.newServerAssignor(""));
- }
-
- @Test
- public void testEquals() {
- // Server assignors
- AssignorSelection selection1 =
AssignorSelection.newServerAssignor("range");
- AssignorSelection selection2 =
AssignorSelection.newServerAssignor("range");
- assertEquals(selection1, selection1);
- assertEquals(selection1, selection2);
- AssignorSelection selection3 =
AssignorSelection.newServerAssignor("uniform");
- assertNotEquals(selection1, selection3);
- assertNotEquals(selection1, null);
- }
-
- @Test
- public void testServerAssignorSelection() {
- String assignorName = "uniform";
- AssignorSelection selection =
AssignorSelection.newServerAssignor(assignorName);
- assertEquals(AssignorSelection.Type.SERVER, selection.type());
- assertTrue(selection.serverAssignor().isPresent());
- assertEquals(assignorName, selection.serverAssignor().get());
- }
-}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 598deace7f5..0856b4ebcb9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -94,7 +94,7 @@ public class HeartbeatRequestManagerTest {
coordinatorRequestManager = mock(CoordinatorRequestManager.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
subscriptionState = mock(SubscriptionState.class);
- membershipManager = spy(new MembershipManagerImpl(GROUP_ID));
+ membershipManager = spy(new MembershipManagerImpl(GROUP_ID,
logContext));
heartbeatRequestState =
mock(HeartbeatRequestManager.HeartbeatRequestState.class);
errorEventHandler = mock(ErrorEventHandler.class);
heartbeatRequestManager = createManager();
@@ -234,7 +234,7 @@ public class HeartbeatRequestManagerTest {
Properties prop = createConsumerConfig();
prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000");
config = new ConsumerConfig(prop);
- membershipManager = new MembershipManagerImpl(GROUP_ID,
GROUP_INSTANCE_ID, null);
+ membershipManager = new MembershipManagerImpl(GROUP_ID,
GROUP_INSTANCE_ID, null, logContext);
heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
logContext,
time,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 4b6aa80c04e..d78bbf2ab63 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -21,9 +21,12 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -35,40 +38,26 @@ public class MembershipManagerImplTest {
private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "test-member-1";
private static final int MEMBER_EPOCH = 1;
+ private final LogContext logContext = new LogContext();
@Test
- public void testMembershipManagerDefaultAssignor() {
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
- assertEquals(AssignorSelection.defaultAssignor(),
membershipManager.assignorSelection());
+ public void testMembershipManagerServerAssignor() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ assertEquals(Optional.empty(), membershipManager.serverAssignor());
- membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1",
null);
- assertEquals(AssignorSelection.defaultAssignor(),
membershipManager.assignorSelection());
- }
-
- @Test
- public void testMembershipManagerAssignorSelectionUpdate() {
- AssignorSelection firstAssignorSelection =
AssignorSelection.newServerAssignor("uniform");
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, "instance1",
- firstAssignorSelection);
- assertEquals(firstAssignorSelection,
membershipManager.assignorSelection());
-
- AssignorSelection secondAssignorSelection =
AssignorSelection.newServerAssignor("range");
- membershipManager.setAssignorSelection(secondAssignorSelection);
- assertEquals(secondAssignorSelection,
membershipManager.assignorSelection());
-
- assertThrows(IllegalArgumentException.class,
- () -> membershipManager.setAssignorSelection(null));
+ membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1",
"Uniform", logContext);
+ assertEquals(Optional.of("Uniform"),
membershipManager.serverAssignor());
}
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
- new MembershipManagerImpl(GROUP_ID);
- new MembershipManagerImpl(GROUP_ID, null,
AssignorSelection.defaultAssignor());
+ new MembershipManagerImpl(GROUP_ID, logContext);
+ new MembershipManagerImpl(GROUP_ID, null, null, logContext);
}
@Test
public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
assertEquals(MemberState.UNJOINED, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithoutAssignment =
@@ -84,7 +73,7 @@ public class MembershipManagerImplTest {
@Test
public void testMemberIdAndEpochResetOnFencedMembers() {
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
@@ -99,7 +88,7 @@ public class MembershipManagerImplTest {
@Test
public void testTransitionToFailure() {
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
@@ -112,62 +101,144 @@ public class MembershipManagerImplTest {
}
@Test
- public void testUpdateAssignment() {
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
+ public void testFencingWhenStateIsStable() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.STABLE, membershipManager.state());
+
+ testStateUpdateOnFenceError(membershipManager);
+ }
+
+ @Test
+ public void testFencingWhenStateIsReconciling() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(createAssignment());
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ testStateUpdateOnFenceError(membershipManager);
+ }
+
+ @Test
+ public void testFatalFailureWhenStateIsUnjoined() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+ testStateUpdateOnFatalFailure(membershipManager);
+ }
+
+ @Test
+ public void testFatalFailureWhenStateIsStable() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.STABLE, membershipManager.state());
+
+ testStateUpdateOnFatalFailure(membershipManager);
+ }
+
+ @Test
+ public void testFencingShouldNotHappenWhenStateIsUnjoined() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+ // Getting fenced when the member is not part of the group is not
expected and should
+ // fail with invalid transition.
+ assertThrows(IllegalStateException.class,
membershipManager::transitionToFenced);
+ }
+
+ @Test
+ public void testUpdateStateFailsOnResponsesWithErrors() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ // Updating state with a heartbeat response containing errors cannot
be performed and
+ // should fail.
+ ConsumerGroupHeartbeatResponse unknownMemberResponse =
+
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
+ assertThrows(IllegalArgumentException.class,
+ () ->
membershipManager.updateState(unknownMemberResponse.data()));
+ }
+
+ @Test
+ public void testAssignmentUpdatedAsReceivedAndProcessed() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment =
createAssignment();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(newAssignment);
membershipManager.updateState(heartbeatResponse.data());
// Target assignment should be in the process of being reconciled
- checkAssignments(membershipManager, null, newAssignment, null);
+ checkAssignments(membershipManager, null, newAssignment);
+ // Mark assignment processing completed
+ membershipManager.onTargetAssignmentProcessComplete(newAssignment);
+ // Target assignment should now be the current assignment
+ checkAssignments(membershipManager, newAssignment, null);
}
@Test
- public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess()
{
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
+ public void
testMemberFailsIfAssignmentReceivedWhileAnotherOnBeingReconciled() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 =
createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
// First target assignment received should be in the process of being
reconciled
- checkAssignments(membershipManager, null, newAssignment1, null);
+ checkAssignments(membershipManager, null, newAssignment1);
// Second target assignment received while there is another one being
reconciled
ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 =
createAssignment();
-
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
- checkAssignments(membershipManager, null, newAssignment1,
newAssignment2);
+ assertThrows(IllegalStateException.class,
+ () ->
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data()));
+ assertEquals(MemberState.FAILED, membershipManager.state());
}
@Test
- public void
testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() {
- MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
- ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 =
createAssignment();
-
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
-
- // First target assignment received, remains in the process of being
reconciled
- checkAssignments(membershipManager, null, newAssignment1, null);
-
- // Second target assignment received while there is another one being
reconciled
- ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 =
createAssignment();
-
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
- checkAssignments(membershipManager, null, newAssignment1,
newAssignment2);
+ public void
testAssignmentUpdatedFailsIfAssignmentReconciledDoesNotMatchTargetAssignment() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(Collections.singletonList(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(Arrays.asList(0, 1, 2))));
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
+ createConsumerGroupHeartbeatResponse(targetAssignment);
+ membershipManager.updateState(heartbeatResponse.data());
- // If more assignments are received while there is one being
reconciled, the most recent
- // assignment received is kept as nextTargetAssignment
- ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 =
createAssignment();
-
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data());
- checkAssignments(membershipManager, null, newAssignment1,
newAssignment3);
+ // Target assignment should be in the process of being reconciled
+ checkAssignments(membershipManager, null, targetAssignment);
+ // Mark assignment processing completed
+ ConsumerGroupHeartbeatResponseData.Assignment reconciled =
+ new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(Collections.singletonList(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+
.setPartitions(Collections.singletonList(0))));
+ assertThrows(IllegalStateException.class, () ->
membershipManager.onTargetAssignmentProcessComplete(reconciled));
}
private void checkAssignments(
MembershipManagerImpl membershipManager,
ConsumerGroupHeartbeatResponseData.Assignment
expectedCurrentAssignment,
- ConsumerGroupHeartbeatResponseData.Assignment
expectedTargetAssignment,
- ConsumerGroupHeartbeatResponseData.Assignment
expectedNextTargetAssignment) {
- assertEquals(expectedCurrentAssignment,
membershipManager.assignment());
+ ConsumerGroupHeartbeatResponseData.Assignment
expectedTargetAssignment) {
+ assertEquals(expectedCurrentAssignment,
membershipManager.currentAssignment());
assertEquals(expectedTargetAssignment,
membershipManager.targetAssignment().orElse(null));
- assertEquals(expectedNextTargetAssignment,
membershipManager.nextTargetAssignment().orElse(null));
+ }
+ private void testStateUpdateOnFenceError(MembershipManager
membershipManager) {
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ // Should reset member epoch and keep member id
+ assertFalse(membershipManager.memberId().isEmpty());
+ assertEquals(0, membershipManager.memberEpoch());
+ }
+
+ private void testStateUpdateOnFatalFailure(MembershipManager
membershipManager) {
+ String initialMemberId = membershipManager.memberId();
+ int initialMemberEpoch = membershipManager.memberEpoch();
+ membershipManager.transitionToFailed();
+ assertEquals(MemberState.FAILED, membershipManager.state());
+ // Should not reset member id or epoch
+ assertEquals(initialMemberId, membershipManager.memberId());
+ assertEquals(initialMemberEpoch, membershipManager.memberEpoch());
}
private ConsumerGroupHeartbeatResponse
createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment
assignment) {