This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6c23a900fcb KAFKA-15278: Implement HeartbeatRequestManager to handle
heartbeat requests (#14364)
6c23a900fcb is described below
commit 6c23a900fcbb2b8c2c092a355921b3424041ab4f
Author: Philip Nee <[email protected]>
AuthorDate: Mon Oct 9 08:35:42 2023 -0700
KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
(#14364)
HeartbeatRequestManager is responsible for handling the
ConsumerGroupHeartbeat request and response. The manager has the following
responsibilities:
1. Sending the request to the GroupCoordinator when it is possible and
necessary
2. Handling the response and update the `MembershipManagerImpl` based on
the error/response it receives.
3. Handles request retries and fatal failures
For Successful heartbeat response:
- Updates the MembershipManager
For Failures handling:
- Retriables Errors: backoff and retries
- Fenced: Transition to a fenced state and reset the epoch, and retry in
the next poll
- Fatal: Propagate the error to the user and fail the state machine
Reviewers: Kirk True <[email protected]>, Lianet Magrans
<[email protected]>, David Jacot <[email protected]>
---
.../consumer/internals/AssignorSelection.java | 14 +-
.../internals/DefaultBackgroundThread.java | 64 ++--
.../internals/HeartbeatRequestManager.java | 385 +++++++++++++++++++
.../consumer/internals/MembershipManager.java | 16 +
.../consumer/internals/MembershipManagerImpl.java | 47 ++-
.../consumer/internals/RequestManagers.java | 6 +-
.../internals/events/ErrorBackgroundEvent.java | 6 +-
.../message/ConsumerGroupHeartbeatRequest.json | 2 +-
.../clients/consumer/AssignorSelectionTest.java | 4 +-
.../internals/DefaultBackgroundThreadTest.java | 120 +++---
.../internals/HeartbeatRequestManagerTest.java | 409 +++++++++++++++++++++
.../internals/MembershipManagerImplTest.java | 41 +--
12 files changed, 985 insertions(+), 129 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
index 1d8d0c8cc51..5eaae957ea5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
@@ -18,6 +18,7 @@
package org.apache.kafka.clients.consumer.internals;
import java.util.Objects;
+import java.util.Optional;
/**
* Selection of a type of assignor used by a member to get partitions assigned
as part of a
@@ -30,12 +31,12 @@ public class AssignorSelection {
public enum Type { SERVER }
private final AssignorSelection.Type type;
- private String serverAssignor;
+ private final Optional<String> serverAssignor;
private AssignorSelection(Type type, String serverAssignor) {
this.type = type;
if (type == Type.SERVER) {
- this.serverAssignor = serverAssignor;
+ this.serverAssignor = Optional.ofNullable(serverAssignor);
} else {
throw new IllegalArgumentException("Unsupported assignor type " +
type);
}
@@ -53,10 +54,10 @@ public class AssignorSelection {
public static AssignorSelection defaultAssignor() {
// TODO: review default selection
- return new AssignorSelection(Type.SERVER, "uniform");
+ return new AssignorSelection(Type.SERVER, null);
}
- public String serverAssignor() {
+ public Optional<String> serverAssignor() {
return this.serverAssignor;
}
@@ -79,6 +80,9 @@ public class AssignorSelection {
@Override
public String toString() {
- return String.format("Assignor selection {type:%s, name:%s}", type,
serverAssignor);
+ return "AssignorSelection(" +
+ "type=" + type + ", " +
+ "serverAssignor='" + serverAssignor + '\'' +
+ ')';
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 90e27559f90..774f4a141b8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -91,7 +91,8 @@ public class DefaultBackgroundThread extends KafkaThread {
final CoordinatorRequestManager coordinatorManager,
final CommitRequestManager commitRequestManager,
final OffsetsRequestManager offsetsRequestManager,
- final TopicMetadataRequestManager
topicMetadataRequestManager) {
+ final TopicMetadataRequestManager
topicMetadataRequestManager,
+ final HeartbeatRequestManager
heartbeatRequestManager) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.running = true;
@@ -108,7 +109,8 @@ public class DefaultBackgroundThread extends KafkaThread {
offsetsRequestManager,
topicMetadataRequestManager,
Optional.ofNullable(coordinatorManager),
- Optional.ofNullable(commitRequestManager));
+ Optional.ofNullable(commitRequestManager),
+ Optional.ofNullable(heartbeatRequestManager));
}
public DefaultBackgroundThread(final Time time,
@@ -138,19 +140,19 @@ public class DefaultBackgroundThread extends KafkaThread {
this.config = config;
this.metadata = metadata;
final NetworkClient networkClient =
ClientUtils.createNetworkClient(config,
- metrics,
- CONSUMER_METRIC_GROUP_PREFIX,
- logContext,
- apiVersions,
- time,
- CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
- metadata,
- fetcherThrottleTimeSensor);
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX,
+ logContext,
+ apiVersions,
+ time,
+ CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
+ metadata,
+ fetcherThrottleTimeSensor);
this.networkClientDelegate = new NetworkClientDelegate(
- this.time,
- this.config,
- logContext,
- networkClient);
+ this.time,
+ this.config,
+ logContext,
+ networkClient);
this.running = true;
this.errorEventHandler = new
ErrorEventHandler(this.backgroundEventQueue);
this.groupState = new GroupState(rebalanceConfig);
@@ -159,22 +161,24 @@ public class DefaultBackgroundThread extends KafkaThread {
final int requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
OffsetsRequestManager offsetsRequestManager =
- new OffsetsRequestManager(
- subscriptionState,
- metadata,
- configuredIsolationLevel(config),
- time,
- retryBackoffMs,
- requestTimeoutMs,
- apiVersions,
- networkClientDelegate,
- logContext);
+ new OffsetsRequestManager(
+ subscriptionState,
+ metadata,
+ configuredIsolationLevel(config),
+ time,
+ retryBackoffMs,
+ requestTimeoutMs,
+ apiVersions,
+ networkClientDelegate,
+ logContext);
CoordinatorRequestManager coordinatorRequestManager = null;
CommitRequestManager commitRequestManager = null;
TopicMetadataRequestManager topicMetadataRequestManger = new
TopicMetadataRequestManager(
logContext,
config);
+ HeartbeatRequestManager heartbeatRequestManager = null;
+ // TODO: consolidate groupState and memberState
if (groupState.groupId != null) {
coordinatorRequestManager = new CoordinatorRequestManager(
this.time,
@@ -190,13 +194,23 @@ public class DefaultBackgroundThread extends KafkaThread {
config,
coordinatorRequestManager,
groupState);
+ MembershipManager membershipManager = new
MembershipManagerImpl(groupState.groupId);
+ heartbeatRequestManager = new HeartbeatRequestManager(
+ this.time,
+ logContext,
+ config,
+ coordinatorRequestManager,
+ subscriptionState,
+ membershipManager,
+ errorEventHandler);
}
this.requestManagers = new RequestManagers(
offsetsRequestManager,
topicMetadataRequestManger,
Optional.ofNullable(coordinatorRequestManager),
- Optional.ofNullable(commitRequestManager));
+ Optional.ofNullable(commitRequestManager),
+ Optional.ofNullable(heartbeatRequestManager));
this.applicationEventProcessor = new ApplicationEventProcessor(
backgroundEventQueue,
requestManagers,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
new file mode 100644
index 00000000000..7941c4effb2
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * <p>Manages the request creation and response handling for the heartbeat.
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module
will update the state in the
+ * {@link MembershipManager} and handle any errors.</p>
+ *
+ * <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ *
+ * <p>If the member got kick out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ *
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
+ *
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
+ *
+ * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.</p>
+ *
+ * <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.</p>
+ *
+ * <p>See {@link HeartbeatRequestState} for more details.</p>
+ */
+public class HeartbeatRequestManager implements RequestManager {
+ private final Logger logger;
+ private final Time time;
+
+ /**
+ * Time that the group coordinator will wait on member to revoke its
partitions. This is provided by the group
+ * coordinator in the heartbeat
+ */
+ private final int rebalanceTimeoutMs;
+
+ /**
+ * CoordinatorRequestManager manages the connection to the group
coordinator
+ */
+ private final CoordinatorRequestManager coordinatorRequestManager;
+
+ /**
+ * SubscriptionState tracks the topic, partition and offset of the member
+ */
+ private final SubscriptionState subscriptions;
+
+ /**
+ * HeartbeatRequestState manages heartbeat request timing and retries
+ */
+ private final HeartbeatRequestState heartbeatRequestState;
+
+ /**
+ * MembershipManager manages member's essential attributes like epoch and
id, and its rebalance state
+ */
+ private final MembershipManager membershipManager;
+
+ /**
+ * ErrorEventHandler allows the background thread to propagate errors back
to the user
+ */
+ private final ErrorEventHandler nonRetriableErrorHandler;
+
+ public HeartbeatRequestManager(
+ final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final CoordinatorRequestManager coordinatorRequestManager,
+ final SubscriptionState subscriptions,
+ final MembershipManager membershipManager,
+ final ErrorEventHandler nonRetriableErrorHandler) {
+ this.coordinatorRequestManager = coordinatorRequestManager;
+ this.time = time;
+ this.logger = logContext.logger(getClass());
+ this.subscriptions = subscriptions;
+ this.membershipManager = membershipManager;
+ this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+ this.rebalanceTimeoutMs =
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+ long retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ long retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+ this.heartbeatRequestState = new HeartbeatRequestState(logContext,
time, 0, retryBackoffMs,
+ retryBackoffMaxMs, rebalanceTimeoutMs);
+ }
+
+ // Visible for testing
+ HeartbeatRequestManager(
+ final LogContext logContext,
+ final Time time,
+ final ConsumerConfig config,
+ final CoordinatorRequestManager coordinatorRequestManager,
+ final SubscriptionState subscriptions,
+ final MembershipManager membershipManager,
+ final HeartbeatRequestState heartbeatRequestState,
+ final ErrorEventHandler nonRetriableErrorHandler) {
+ this.logger = logContext.logger(this.getClass());
+ this.time = time;
+ this.subscriptions = subscriptions;
+ this.rebalanceTimeoutMs =
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+ this.coordinatorRequestManager = coordinatorRequestManager;
+ this.heartbeatRequestState = heartbeatRequestState;
+ this.membershipManager = membershipManager;
+ this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+ }
+
+ /**
+ * Determines the maximum wait time until the next poll based on the
member's state, and creates a heartbeat
+ * request.
+ * <ol>
+ * <li>If the member is without a coordinator or is in a failed state,
the timer is set to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+ * <li>If the member cannot send a heartbeat due to either exponential
backoff, it will return the remaining time left on the backoff timer.</li>
+ * <li>If the member's heartbeat timer has not expired, It will return
the remaining time left on the
+ * heartbeat timer.</li>
+ * <li>If the member can send a heartbeat, the timer is set to the
current heartbeat interval.</li>
+ * </ol>
+ */
+ @Override
+ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+ if (!coordinatorRequestManager.coordinator().isPresent() ||
!membershipManager.shouldSendHeartbeat()) {
+ return new NetworkClientDelegate.PollResult(
+ Long.MAX_VALUE, Collections.emptyList());
+ }
+
+ // TODO: We will need to send a heartbeat response after partitions
being revoke. This needs to be
+ // implemented either with or after the partition reconciliation
logic.
+ if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+ return new NetworkClientDelegate.PollResult(
+ heartbeatRequestState.nextHeartbeatMs(currentTimeMs),
+ Collections.emptyList());
+ }
+ this.heartbeatRequestState.onSendAttempt(currentTimeMs);
+ NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest();
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
Collections.singletonList(request));
+ }
+
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+ // TODO: We only need to send the rebalanceTimeoutMs field once unless
the first request failed.
+ ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(membershipManager.groupId())
+ .setMemberEpoch(membershipManager.memberEpoch())
+ .setMemberId(membershipManager.memberId())
+ .setRebalanceTimeoutMs(rebalanceTimeoutMs);
+
+ membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
+
+ if (this.subscriptions.hasPatternSubscription()) {
+ // TODO: Pass the string to the GC if server side regex is used.
+ } else {
+ data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
+ }
+
+
this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor);
+
+ NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
+ new ConsumerGroupHeartbeatRequest.Builder(data),
+ coordinatorRequestManager.coordinator());
+ request.future().whenComplete((response, exception) -> {
+ if (response != null) {
+ onResponse((ConsumerGroupHeartbeatResponse)
response.responseBody(), response.receivedTimeMs());
+ } else {
+ // TODO: Currently, we lack a good way to propage the response
time from the network client to the
+ // request handler. We will need to store the response time
in the handler to make it accessible.
+ onFailure(exception, time.milliseconds());
+ }
+ });
+ return request;
+ }
+
+ private void onFailure(final Throwable exception, final long
responseTimeMs) {
+ this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
+ if (exception instanceof RetriableException) {
+ String message = String.format("GroupHeartbeatRequest failed
because of the retriable exception. " +
+ "Will retry in %s ms: {}",
+ heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+ exception.getMessage());
+ logger.debug(message);
+ } else {
+ logger.error("GroupHeartbeatRequest failed due to fatal error: " +
exception.getMessage());
+ handleFatalFailure(exception);
+ }
+ }
+
+ private void onResponse(final ConsumerGroupHeartbeatResponse response,
long currentTimeMs) {
+ if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
+
this.heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+ this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
+ this.heartbeatRequestState.resetTimer();
+ this.membershipManager.updateState(response.data());
+ return;
+ }
+ onErrorResponse(response, currentTimeMs);
+ }
+
+ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+ final long currentTimeMs) {
+ Errors error = Errors.forCode(response.data().errorCode());
+ String errorMessage = response.data().errorMessage();
+ // TODO: upon encountering a fatal/fenced error, trigger
onPartitionLost logic to give up the current
+ // assignments.
+ switch (error) {
+ case NOT_COORDINATOR:
+ // the manager should retry immediately when the coordinator
node becomes available again
+ String message = String.format("GroupHeartbeatRequest failed
because the group coordinator %s is incorrect. " +
+ "Will attempt to find the coordinator again
and retry",
+ coordinatorRequestManager.coordinator());
+ logInfo(message, response, currentTimeMs);
+ coordinatorRequestManager.markCoordinatorUnknown(errorMessage,
currentTimeMs);
+ break;
+
+ case COORDINATOR_NOT_AVAILABLE:
+ message = String.format("GroupHeartbeatRequest failed because
the group coordinator %s is not available. " +
+ "Will attempt to find the coordinator again
and retry",
+ coordinatorRequestManager.coordinator());
+ logInfo(message, response, currentTimeMs);
+ coordinatorRequestManager.markCoordinatorUnknown(errorMessage,
currentTimeMs);
+ break;
+
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ // the manager will backoff and retry
+ message = String.format("GroupHeartbeatRequest failed because
the group coordinator %s is still loading." +
+ "Will retry",
+ coordinatorRequestManager.coordinator());
+ logInfo(message, response, currentTimeMs);
+ heartbeatRequestState.onFailedAttempt(currentTimeMs);
+ break;
+
+ case GROUP_AUTHORIZATION_FAILED:
+ GroupAuthorizationException exception =
+
GroupAuthorizationException.forGroupId(membershipManager.groupId());
+ logger.error("GroupHeartbeatRequest failed due to group
authorization failure: {}", exception.getMessage());
+ handleFatalFailure(error.exception(exception.getMessage()));
+ break;
+
+ case UNRELEASED_INSTANCE_ID:
+ logger.error("GroupHeartbeatRequest failed due to the instance
id {} was not released: {}",
+ membershipManager.groupInstanceId().orElse("null"),
errorMessage);
+
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
+ break;
+
+ case INVALID_REQUEST:
+ case GROUP_MAX_SIZE_REACHED:
+ case UNSUPPORTED_ASSIGNOR:
+ case UNSUPPORTED_VERSION:
+ logger.error("GroupHeartbeatRequest failed due to error: {}",
error);
+ handleFatalFailure(error.exception(errorMessage));
+ break;
+
+ case FENCED_MEMBER_EPOCH:
+ message = String.format("GroupHeartbeatRequest failed because
member epoch %s is invalid. " +
+ "Will abandon all partitions and rejoin the
group",
+ membershipManager.memberId(),
membershipManager.memberEpoch());
+ logInfo(message, response, currentTimeMs);
+ membershipManager.transitionToFenced();
+ break;
+
+ case UNKNOWN_MEMBER_ID:
+ message = String.format("GroupHeartbeatRequest failed because
member id %s is invalid. " +
+ "Will abandon all partitions and rejoin the
group",
+ membershipManager.memberId(),
membershipManager.memberEpoch());
+ logInfo(message, response, currentTimeMs);
+ membershipManager.transitionToFenced();
+ break;
+
+ default:
+ // If the manager receives an unknown error - there could be a
bug in the code or a new error code
+ logger.error("GroupHeartbeatRequest failed due to unexpected
error: {}", error);
+ handleFatalFailure(error.exception(errorMessage));
+ break;
+ }
+ }
+
+ private void logInfo(final String message,
+ final ConsumerGroupHeartbeatResponse response,
+ final long currentTimeMs) {
+ logger.info("{} in {}ms: {}",
+ message,
+ heartbeatRequestState.remainingBackoffMs(currentTimeMs),
+ response.data().errorMessage());
+ }
+
+ private void handleFatalFailure(Throwable error) {
+ nonRetriableErrorHandler.handle(error);
+ membershipManager.transitionToFailed();
+ }
+
+ /**
+ * Represents the state of a heartbeat request, including logic for
timing, retries, and exponential backoff. The
+ * object extends {@link RequestState} to enable exponential backoff and
duplicated request handling. The two fields
+ * that it holds are:
+ */
+ static class HeartbeatRequestState extends RequestState {
+ /**
+ * heartbeatTimer tracks the time since the last heartbeat was sent
+ */
+ private final Timer heartbeatTimer;
+
+ /**
+ * The heartbeat interval which is acquired/updated through the
heartbeat request
+ */
+ private long heartbeatIntervalMs;
+
+ public HeartbeatRequestState(
+ final LogContext logContext,
+ final Time time,
+ final long heartbeatIntervalMs,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs) {
+ super(logContext, HeartbeatRequestState.class.getName(),
retryBackoffMs, retryBackoffMaxMs);
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.heartbeatTimer = time.timer(heartbeatIntervalMs);
+ }
+
+ public HeartbeatRequestState(
+ final LogContext logContext,
+ final Time time,
+ final long heartbeatIntervalMs,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final double jitter) {
+ super(logContext, HeartbeatRequestState.class.getName(),
retryBackoffMs, 2, retryBackoffMaxMs, jitter);
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.heartbeatTimer = time.timer(heartbeatIntervalMs);
+ }
+
+ private void update(final long currentTimeMs) {
+ this.heartbeatTimer.update(currentTimeMs);
+ }
+
+ public void resetTimer() {
+ this.heartbeatTimer.reset(heartbeatIntervalMs);
+ }
+
+ @Override
+ public boolean canSendRequest(final long currentTimeMs) {
+ update(currentTimeMs);
+ return heartbeatTimer.isExpired() &&
super.canSendRequest(currentTimeMs);
+ }
+
+ public long nextHeartbeatMs(final long currentTimeMs) {
+ if (heartbeatTimer.remainingMs() == 0) {
+ return this.remainingBackoffMs(currentTimeMs);
+ }
+ return heartbeatTimer.remainingMs();
+ }
+
+ private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs)
{
+ if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
+ // no need to update the timer if the interval hasn't changed
+ return;
+ }
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index a0a72e56336..c0fb9ed3903 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -59,4 +59,20 @@ public interface MembershipManager {
* current assignment.
*/
void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment
assignment);
+
+ /**
+ * Transition the member to the FENCED state. This is only invoked when
the heartbeat returns a
+ * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+ */
+ void transitionToFenced();
+
+ /**
+ * Transition the member to the FAILED state. This is invoked when the
heartbeat returns a non-retriable error.
+ */
+ void transitionToFailed();
+
+ /**
+ * Return true if the member should send heartbeat to the coordinator
+ */
+ boolean shouldSendHeartbeat();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index ad425f41d72..f4f97bb36f9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -115,25 +115,36 @@ public class MembershipManagerImpl implements
MembershipManager {
@Override
public void updateState(ConsumerGroupHeartbeatResponseData response) {
- if (response.errorCode() == Errors.NONE.code()) {
- this.memberId = response.memberId();
- this.memberEpoch = response.memberEpoch();
- ConsumerGroupHeartbeatResponseData.Assignment assignment =
response.assignment();
- if (assignment != null) {
- setTargetAssignment(assignment);
- }
- maybeTransitionToStable();
- } else {
- if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() ||
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
- resetEpoch();
- transitionTo(MemberState.FENCED);
- } else if (response.errorCode() ==
Errors.UNRELEASED_INSTANCE_ID.code()) {
- transitionTo(MemberState.FAILED);
- }
- // TODO: handle other errors here to update state accordingly,
mainly making the
- // distinction between the recoverable errors and the fatal ones,
that should FAILED
- // the member
+ if (response.errorCode() != Errors.NONE.code()) {
+ String errorMessage = String.format(
+ "Unexpected error in Heartbeat response. Expected no
error, but received: %s",
+ Errors.forCode(response.errorCode())
+ );
+ throw new IllegalStateException(errorMessage);
+ }
+ this.memberId = response.memberId();
+ this.memberEpoch = response.memberEpoch();
+ ConsumerGroupHeartbeatResponseData.Assignment assignment =
response.assignment();
+ if (assignment != null) {
+ setTargetAssignment(assignment);
}
+ maybeTransitionToStable();
+ }
+
+ @Override
+ public void transitionToFenced() {
+ resetEpoch();
+ transitionTo(MemberState.FENCED);
+ }
+
+ @Override
+ public void transitionToFailed() {
+ transitionTo(MemberState.FAILED);
+ }
+
+ @Override
+ public boolean shouldSendHeartbeat() {
+ return state() != MemberState.FAILED;
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 3864b1fcaa6..5a397532fe3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -32,6 +32,7 @@ public class RequestManagers {
public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
public final Optional<CommitRequestManager> commitRequestManager;
+ private final Optional<HeartbeatRequestManager> heartbeatRequestManager;
public final OffsetsRequestManager offsetsRequestManager;
public final TopicMetadataRequestManager topicMetadataRequestManager;
private final List<Optional<? extends RequestManager>> entries;
@@ -39,16 +40,19 @@ public class RequestManagers {
public RequestManagers(OffsetsRequestManager offsetsRequestManager,
TopicMetadataRequestManager
topicMetadataRequestManager,
Optional<CoordinatorRequestManager>
coordinatorRequestManager,
- Optional<CommitRequestManager>
commitRequestManager) {
+ Optional<CommitRequestManager> commitRequestManager,
+ Optional<HeartbeatRequestManager>
heartbeatRequestManager) {
this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
"OffsetsRequestManager cannot be null");
this.coordinatorRequestManager = coordinatorRequestManager;
this.commitRequestManager = commitRequestManager;
this.topicMetadataRequestManager = topicMetadataRequestManager;
+ this.heartbeatRequestManager = heartbeatRequestManager;
List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(coordinatorRequestManager);
list.add(commitRequestManager);
+ list.add(heartbeatRequestManager);
list.add(Optional.of(offsetsRequestManager));
list.add(Optional.of(topicMetadataRequestManager));
entries = Collections.unmodifiableList(list);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
index 4fc08290b71..74d35983291 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
@@ -50,8 +50,8 @@ public class ErrorBackgroundEvent extends BackgroundEvent {
@Override
public String toString() {
return "ErrorBackgroundEvent{" +
- toStringBase() +
- ", error=" + error +
- '}';
+ toStringBase() +
+ ", error=" + error +
+ '}';
}
}
\ No newline at end of file
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
index a05db31618a..e86a79fcd68 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
@@ -36,7 +36,7 @@
{ "name": "RackId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last
heartbeat; the rack ID of consumer otherwise." },
{ "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+",
"default": -1,
- "about": "-1 if it didn't chance since the last heartbeat; the maximum
time in milliseconds that the coordinator will wait on the member to revoke its
partitions otherwise." },
+ "about": "-1 if it didn't change since the last heartbeat; the maximum
time in milliseconds that the coordinator will wait on the member to revoke its
partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+",
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
"about": "null if it didn't change since the last heartbeat; the
subscribed topic names otherwise." },
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
index bdae8adcbe8..57c61e3dc14 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class AssignorSelectionTest {
@@ -51,6 +52,7 @@ public class AssignorSelectionTest {
String assignorName = "uniform";
AssignorSelection selection =
AssignorSelection.newServerAssignor(assignorName);
assertEquals(AssignorSelection.Type.SERVER, selection.type());
- assertEquals(assignorName, selection.serverAssignor());
+ assertTrue(selection.serverAssignor().isPresent());
+ assertEquals(assignorName, selection.serverAssignor().get());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index 137bd106d6e..a7d8ac0a61a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -86,6 +86,7 @@ public class DefaultBackgroundThreadTest {
private GroupState groupState;
private CommitRequestManager commitManager;
private TopicMetadataRequestManager topicMetadataRequestManager;
+ private HeartbeatRequestManager heartbeatRequestManager;
@BeforeEach
@SuppressWarnings("unchecked")
@@ -98,6 +99,7 @@ public class DefaultBackgroundThreadTest {
this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
this.coordinatorManager = mock(CoordinatorRequestManager.class);
this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+ this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
this.errorEventHandler = mock(ErrorEventHandler.class);
GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
100,
@@ -117,8 +119,9 @@ public class DefaultBackgroundThreadTest {
public void testStartupAndTearDown() throws InterruptedException {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
backgroundThread.start();
TestUtils.waitForCondition(backgroundThread::isRunning, "Failed
awaiting for the background thread to be running");
@@ -130,10 +133,11 @@ public class DefaultBackgroundThreadTest {
public void testApplicationEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
-
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ApplicationEvent e = new NoopApplicationEvent("noop event");
this.applicationEventsQueue.add(e);
@@ -150,10 +154,11 @@ public class DefaultBackgroundThreadTest {
this.backgroundEventsQueue,
mockRequestManagers(),
metadata);
-
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
this.applicationEventsQueue.add(e);
@@ -166,10 +171,11 @@ public class DefaultBackgroundThreadTest {
public void testCommitEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
-
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
this.applicationEventsQueue.add(e);
@@ -183,8 +189,9 @@ public class DefaultBackgroundThreadTest {
public void testListOffsetsEventIsProcessed() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -201,8 +208,9 @@ public class DefaultBackgroundThreadTest {
public void testResetPositionsEventIsProcessed() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -218,7 +226,7 @@ public class DefaultBackgroundThreadTest {
public void testResetPositionsProcessFailure() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
applicationEventProcessor = spy(new ApplicationEventProcessor(
@@ -242,8 +250,9 @@ public class DefaultBackgroundThreadTest {
public void testValidatePositionsEventIsProcessed() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -259,7 +268,7 @@ public class DefaultBackgroundThreadTest {
public void testValidatePositionsProcessFailure() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
applicationEventProcessor = spy(new ApplicationEventProcessor(
@@ -295,10 +304,11 @@ public class DefaultBackgroundThreadTest {
ApplicationEvent e = new AssignmentChangeApplicationEvent(offset,
currentTimeMs);
this.applicationEventsQueue.add(e);
-
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
backgroundThread.runOnce();
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
@@ -312,10 +322,11 @@ public class DefaultBackgroundThreadTest {
@Test
void testFindCoordinator() {
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
-
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
backgroundThread.runOnce();
Mockito.verify(coordinatorManager, times(1)).poll(anyLong());
Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong());
@@ -326,10 +337,11 @@ public class DefaultBackgroundThreadTest {
void testFetchTopicMetadata() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
-
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
this.applicationEventsQueue.add(new
TopicMetadataApplicationEvent("topic"));
backgroundThread.runOnce();
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class));
@@ -362,10 +374,11 @@ public class DefaultBackgroundThreadTest {
private RequestManagers mockRequestManagers() {
return new RequestManagers(
- offsetsRequestManager,
- topicMetadataRequestManager,
- Optional.of(coordinatorManager),
- Optional.of(commitManager));
+ offsetsRequestManager,
+ topicMetadataRequestManager,
+ Optional.of(coordinatorManager),
+ Optional.of(commitManager),
+ Optional.of(heartbeatRequestManager));
}
private static NetworkClientDelegate.UnsentRequest
findCoordinatorUnsentRequest(
@@ -388,20 +401,21 @@ public class DefaultBackgroundThreadTest {
properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
return new DefaultBackgroundThread(
- this.time,
- new ConsumerConfig(properties),
- new LogContext(),
- applicationEventsQueue,
- backgroundEventsQueue,
- this.errorEventHandler,
- applicationEventProcessor,
- this.metadata,
- this.networkClient,
- this.groupState,
- this.coordinatorManager,
- this.commitManager,
- this.offsetsRequestManager,
- this.topicMetadataRequestManager);
+ this.time,
+ new ConsumerConfig(properties),
+ new LogContext(),
+ applicationEventsQueue,
+ backgroundEventsQueue,
+ this.errorEventHandler,
+ applicationEventProcessor,
+ this.metadata,
+ this.networkClient,
+ this.groupState,
+ this.coordinatorManager,
+ this.commitManager,
+ this.offsetsRequestManager,
+ this.topicMetadataRequestManager,
+ this.heartbeatRequestManager);
}
private NetworkClientDelegate.PollResult mockPollCoordinatorResult() {
@@ -416,7 +430,7 @@ public class DefaultBackgroundThreadTest {
Collections.singletonList(findCoordinatorUnsentRequest(time,
requestTimeoutMs)));
}
- private NetworkClientDelegate.PollResult emptyPollOffsetsRequestResult() {
+ private NetworkClientDelegate.PollResult emptyPollResults() {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE,
Collections.emptyList());
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
new file mode 100644
index 00000000000..598deace7f5
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+ private static final int HEARTBEAT_INTERVAL_MS = 1000;
+ private static final long RETRY_BACKOFF_MAX_MS = 3000;
+ private static final long RETRY_BACKOFF_MS = 100;
+ private static final String GROUP_INSTANCE_ID = "group-instance-id";
+ private static final String GROUP_ID = "group-id";
+
+ private Time time;
+ private LogContext logContext;
+ private CoordinatorRequestManager coordinatorRequestManager;
+ private SubscriptionState subscriptionState;
+ private HeartbeatRequestManager heartbeatRequestManager;
+ private MembershipManager membershipManager;
+ private HeartbeatRequestManager.HeartbeatRequestState
heartbeatRequestState;
+ private ConsumerConfig config;
+
+ private String memberId = "member-id";
+ private int memberEpoch = 1;
+ private ErrorEventHandler errorEventHandler;
+
+ @BeforeEach
+ public void setUp() {
+ time = new MockTime();
+ logContext = new LogContext();
+ config = new ConsumerConfig(createConsumerConfig());
+ coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
+ subscriptionState = mock(SubscriptionState.class);
+ membershipManager = spy(new MembershipManagerImpl(GROUP_ID));
+ heartbeatRequestState =
mock(HeartbeatRequestManager.HeartbeatRequestState.class);
+ errorEventHandler = mock(ErrorEventHandler.class);
+ heartbeatRequestManager = createManager();
+ }
+
+ private Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ properties.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ properties.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+ return properties;
+ }
+
+ @Test
+ public void testHeartbeatOnStartup() {
+ // The initial heartbeatInterval is set to 0
+ heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ 0,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MAX_MS,
+ 0);
+ heartbeatRequestManager = createManager();
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Ensure we do not resend the request without the first request being
completed
+ NetworkClientDelegate.PollResult result2 =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result2.unsentRequests.size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testSendHeartbeatOnMemberState(final boolean
shouldSendHeartbeat) {
+ // Mocking notInGroup
+
when(membershipManager.shouldSendHeartbeat()).thenReturn(shouldSendHeartbeat);
+ when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ if (shouldSendHeartbeat) {
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(0, result.timeUntilNextPollMs);
+ } else {
+ assertEquals(0, result.unsentRequests.size());
+ assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("stateProvider")
+ public void testTimerNotDue(final MemberState state) {
+ heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ HEARTBEAT_INTERVAL_MS,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MAX_MS);
+ heartbeatRequestManager = createManager();
+
+ when(membershipManager.state()).thenReturn(state);
+ time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat
should be sent
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result.unsentRequests.size());
+
+ if (membershipManager.shouldSendHeartbeat()) {
+ assertEquals(HEARTBEAT_INTERVAL_MS - 100,
result.timeUntilNextPollMs);
+ } else {
+ assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+ }
+ }
+
+ @Test
+ public void testBackoffOnTimeout() {
+ heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ 0,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MAX_MS,
+ 0);
+ heartbeatRequestManager = createManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
+ when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ result.unsentRequests.get(0).future().completeExceptionally(new
TimeoutException("timeout"));
+
+ // Assure the manager will backoff on timeout
+ time.sleep(RETRY_BACKOFF_MS - 1);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result.unsentRequests.size());
+
+ time.sleep(1);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ }
+
+ @Test
+ public void testFailureOnFatalException() {
+ heartbeatRequestState = spy(new
HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ 0,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MAX_MS,
+ 0));
+ heartbeatRequestManager = createManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
+ when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ result.unsentRequests.get(0).future().completeExceptionally(new
KafkaException("fatal"));
+ verify(membershipManager).transitionToFailed();
+ verify(errorEventHandler).handle(any());
+ }
+
+ @Test
+ public void testNoCoordinator() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+ assertEquals(0, result.unsentRequests.size());
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+ public void testValidateConsumerGroupHeartbeatRequest(final short version)
{
+ List<String> subscribedTopics = Collections.singletonList("topic");
+ subscriptionState = new SubscriptionState(logContext,
OffsetResetStrategy.NONE);
+ subscriptionState.subscribe(new HashSet<>(subscribedTopics), new
NoOpConsumerRebalanceListener());
+
+ Properties prop = createConsumerConfig();
+ prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000");
+ config = new ConsumerConfig(prop);
+ membershipManager = new MembershipManagerImpl(GROUP_ID,
GROUP_INSTANCE_ID, null);
+ heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ 0,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MAX_MS,
+ 0);
+ heartbeatRequestManager = createManager();
+ // Update membershipManager's memberId and memberEpoch
+ ConsumerGroupHeartbeatResponse result =
+ new ConsumerGroupHeartbeatResponse(new
ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch));
+ membershipManager.updateState(result.data());
+
+ // Create a ConsumerHeartbeatRequest and verify the payload
+ NetworkClientDelegate.PollResult pollResult =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, pollResult.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
pollResult.unsentRequests.get(0);
+ assertTrue(request.requestBuilder() instanceof
ConsumerGroupHeartbeatRequest.Builder);
+
+ ConsumerGroupHeartbeatRequest heartbeatRequest =
+ (ConsumerGroupHeartbeatRequest)
request.requestBuilder().build(version);
+ assertEquals(GROUP_ID, heartbeatRequest.data().groupId());
+ assertEquals(memberId, heartbeatRequest.data().memberId());
+ assertEquals(memberEpoch, heartbeatRequest.data().memberEpoch());
+ assertEquals(10000, heartbeatRequest.data().rebalanceTimeoutMs());
+ assertEquals(subscribedTopics,
heartbeatRequest.data().subscribedTopicNames());
+ assertEquals(GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId());
+ // TODO: Test pattern subscription and user provided assignor
selection.
+ assertNull(heartbeatRequest.data().serverAssignor());
+ assertNull(heartbeatRequest.data().subscribedTopicRegex());
+ }
+
+ @ParameterizedTest
+ @MethodSource("errorProvider")
+ public void testHeartbeatResponseOnErrorHandling(final Errors error, final
boolean isFatal) {
+ heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ HEARTBEAT_INTERVAL_MS,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MAX_MS,
+ 0);
+ heartbeatRequestManager = createManager();
+
+ // Sending first heartbeat w/o assignment to set the state to STABLE
+ ConsumerGroupHeartbeatResponse rs1 = new
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+ .setHeartbeatIntervalMs(HEARTBEAT_INTERVAL_MS)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch));
+ membershipManager.updateState(rs1.data());
+ assertEquals(MemberState.STABLE, membershipManager.state());
+
+ // Handling errors on the second heartbeat
+ time.sleep(HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ // Manually completing the response to test error handling
+ ClientResponse response = createHeartbeatResponse(
+ result.unsentRequests.get(0),
+ error);
+ result.unsentRequests.get(0).future().complete(response);
+ ConsumerGroupHeartbeatResponse mockResponse =
(ConsumerGroupHeartbeatResponse) response.responseBody();
+
+ switch (error) {
+ case NONE:
+ verify(errorEventHandler, never()).handle(any());
+ verify(membershipManager,
times(2)).updateState(mockResponse.data());
+ assertEquals(HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+ break;
+
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ verify(errorEventHandler, never()).handle(any());
+ assertEquals(RETRY_BACKOFF_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+ break;
+
+ case COORDINATOR_NOT_AVAILABLE:
+ case NOT_COORDINATOR:
+ verify(errorEventHandler, never()).handle(any());
+
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
+ assertEquals(0,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+ break;
+
+ default:
+ if (isFatal) {
+ // The memberStateManager should have stopped heartbeat at
this point
+ ensureFatalError();
+ } else {
+ verify(errorEventHandler, never()).handle(any());
+ assertEquals(0,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+ }
+ break;
+ }
+ }
+
+ private void ensureFatalError() {
+ verify(membershipManager).transitionToFailed();
+ verify(errorEventHandler).handle(any());
+ ensureHeartbeatStopped();
+ }
+
+ private void ensureHeartbeatStopped() {
+ time.sleep(HEARTBEAT_INTERVAL_MS);
+ assertEquals(MemberState.FAILED, membershipManager.state());
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result.unsentRequests.size());
+ }
+
+ // error, isFatal
+ private static Collection<Arguments> errorProvider() {
+ return Arrays.asList(
+ Arguments.of(Errors.NONE, false),
+ Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, false),
+ Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, false),
+ Arguments.of(Errors.NOT_COORDINATOR, false),
+ Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, true),
+ Arguments.of(Errors.INVALID_REQUEST, true),
+ Arguments.of(Errors.UNKNOWN_MEMBER_ID, false),
+ Arguments.of(Errors.FENCED_MEMBER_EPOCH, false),
+ Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true),
+ Arguments.of(Errors.UNSUPPORTED_VERSION, true),
+ Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true),
+ Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true));
+ }
+
+ private static Collection<Arguments> stateProvider() {
+ return Arrays.asList(
+ Arguments.of(MemberState.UNJOINED),
+ Arguments.of(MemberState.RECONCILING),
+ Arguments.of(MemberState.FAILED),
+ Arguments.of(MemberState.STABLE),
+ Arguments.of(MemberState.FENCED));
+ }
+
+ private ClientResponse createHeartbeatResponse(
+ final NetworkClientDelegate.UnsentRequest request,
+ final Errors error) {
+ ConsumerGroupHeartbeatResponseData data = new
ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(error.code())
+ .setHeartbeatIntervalMs(HEARTBEAT_INTERVAL_MS)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch);
+ if (error != Errors.NONE) {
+ data.setErrorMessage("stubbed error message");
+ }
+ ConsumerGroupHeartbeatResponse response = new
ConsumerGroupHeartbeatResponse(data);
+ return new ClientResponse(
+ new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT,
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
+ request.callback(),
+ "0",
+ time.milliseconds(),
+ time.milliseconds(),
+ false,
+ null,
+ null,
+ response);
+ }
+
+ private HeartbeatRequestManager createManager() {
+ return new HeartbeatRequestManager(
+ logContext,
+ time,
+ config,
+ coordinatorRequestManager,
+ subscriptionState,
+ membershipManager,
+ heartbeatRequestState,
+ errorEventHandler);
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 09e47acb416..4b6aa80c04e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -22,8 +22,6 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
import java.util.Arrays;
@@ -84,9 +82,23 @@ public class MembershipManagerImplTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
- @ParameterizedTest
- @EnumSource(Errors.class)
- public void testMemberIdAndEpochResetOnErrors(Errors error) {
+ @Test
+ public void testMemberIdAndEpochResetOnFencedMembers() {
+ MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
+ createConsumerGroupHeartbeatResponse(null);
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(MEMBER_ID, membershipManager.memberId());
+ assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
+
+ membershipManager.transitionToFenced();
+ assertFalse(membershipManager.memberId().isEmpty());
+ assertEquals(0, membershipManager.memberEpoch());
+ }
+
+ @Test
+ public void testTransitionToFailure() {
MembershipManagerImpl membershipManager = new
MembershipManagerImpl(GROUP_ID);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
@@ -95,23 +107,8 @@ public class MembershipManagerImplTest {
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
- if (error == Errors.UNKNOWN_MEMBER_ID || error ==
Errors.FENCED_MEMBER_EPOCH) {
- // Should reset member epoch and keep member id
- ConsumerGroupHeartbeatResponse heartbeatResponseWithMemberIdError =
-
createConsumerGroupHeartbeatResponseWithError(Errors.FENCED_MEMBER_EPOCH);
-
membershipManager.updateState(heartbeatResponseWithMemberIdError.data());
-
- assertFalse(membershipManager.memberId().isEmpty());
- assertEquals(0, membershipManager.memberEpoch());
- } else {
- // Should not reset member id or epoch
- ConsumerGroupHeartbeatResponse heartbeatResponseWithError =
- createConsumerGroupHeartbeatResponseWithError(error);
- membershipManager.updateState(heartbeatResponseWithError.data());
-
- assertFalse(membershipManager.memberId().isEmpty());
- assertNotEquals(0, membershipManager.memberEpoch());
- }
+ membershipManager.transitionToFailed();
+ assertEquals(MemberState.FAILED, membershipManager.state());
}
@Test