This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6c23a900fcb KAFKA-15278: Implement HeartbeatRequestManager to handle 
heartbeat requests (#14364)
6c23a900fcb is described below

commit 6c23a900fcbb2b8c2c092a355921b3424041ab4f
Author: Philip Nee <[email protected]>
AuthorDate: Mon Oct 9 08:35:42 2023 -0700

    KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests 
(#14364)
    
    HeartbeatRequestManager is responsible for handling the 
ConsumerGroupHeartbeat request and response.  The manager has the following 
responsibilities:
    1. Sending the request to the GroupCoordinator when it is possible and 
necessary
    2. Handling the response and update the `MembershipManagerImpl` based on 
the error/response it receives.
    3. Handles request retries and fatal failures
    
    For Successful heartbeat response:
    - Updates the MembershipManager
    
    For Failures handling:
    - Retriables Errors: backoff and retries
    - Fenced: Transition to a fenced state and reset the epoch, and retry in 
the next poll
    - Fatal: Propagate the error to the user and fail the state machine
    
    Reviewers: Kirk True <[email protected]>, Lianet Magrans 
<[email protected]>, David Jacot <[email protected]>
---
 .../consumer/internals/AssignorSelection.java      |  14 +-
 .../internals/DefaultBackgroundThread.java         |  64 ++--
 .../internals/HeartbeatRequestManager.java         | 385 +++++++++++++++++++
 .../consumer/internals/MembershipManager.java      |  16 +
 .../consumer/internals/MembershipManagerImpl.java  |  47 ++-
 .../consumer/internals/RequestManagers.java        |   6 +-
 .../internals/events/ErrorBackgroundEvent.java     |   6 +-
 .../message/ConsumerGroupHeartbeatRequest.json     |   2 +-
 .../clients/consumer/AssignorSelectionTest.java    |   4 +-
 .../internals/DefaultBackgroundThreadTest.java     | 120 +++---
 .../internals/HeartbeatRequestManagerTest.java     | 409 +++++++++++++++++++++
 .../internals/MembershipManagerImplTest.java       |  41 +--
 12 files changed, 985 insertions(+), 129 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
index 1d8d0c8cc51..5eaae957ea5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Selection of a type of assignor used by a member to get partitions assigned 
as part of a
@@ -30,12 +31,12 @@ public class AssignorSelection {
     public enum Type { SERVER }
 
     private final AssignorSelection.Type type;
-    private String serverAssignor;
+    private final Optional<String> serverAssignor;
 
     private AssignorSelection(Type type, String serverAssignor) {
         this.type = type;
         if (type == Type.SERVER) {
-            this.serverAssignor = serverAssignor;
+            this.serverAssignor = Optional.ofNullable(serverAssignor);
         } else {
             throw new IllegalArgumentException("Unsupported assignor type " + 
type);
         }
@@ -53,10 +54,10 @@ public class AssignorSelection {
 
     public static AssignorSelection defaultAssignor() {
         // TODO: review default selection
-        return new AssignorSelection(Type.SERVER, "uniform");
+        return new AssignorSelection(Type.SERVER, null);
     }
 
-    public String serverAssignor() {
+    public Optional<String> serverAssignor() {
         return this.serverAssignor;
     }
 
@@ -79,6 +80,9 @@ public class AssignorSelection {
 
     @Override
     public String toString() {
-        return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+        return "AssignorSelection(" +
+                "type=" + type + ", " +
+                "serverAssignor='" + serverAssignor + '\'' +
+                ')';
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 90e27559f90..774f4a141b8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -91,7 +91,8 @@ public class DefaultBackgroundThread extends KafkaThread {
                             final CoordinatorRequestManager coordinatorManager,
                             final CommitRequestManager commitRequestManager,
                             final OffsetsRequestManager offsetsRequestManager,
-                            final TopicMetadataRequestManager 
topicMetadataRequestManager) {
+                            final TopicMetadataRequestManager 
topicMetadataRequestManager,
+                            final HeartbeatRequestManager 
heartbeatRequestManager) {
         super(BACKGROUND_THREAD_NAME, true);
         this.time = time;
         this.running = true;
@@ -108,7 +109,8 @@ public class DefaultBackgroundThread extends KafkaThread {
                 offsetsRequestManager,
                 topicMetadataRequestManager,
                 Optional.ofNullable(coordinatorManager),
-                Optional.ofNullable(commitRequestManager));
+                Optional.ofNullable(commitRequestManager),
+                Optional.ofNullable(heartbeatRequestManager));
     }
 
     public DefaultBackgroundThread(final Time time,
@@ -138,19 +140,19 @@ public class DefaultBackgroundThread extends KafkaThread {
             this.config = config;
             this.metadata = metadata;
             final NetworkClient networkClient = 
ClientUtils.createNetworkClient(config,
-                    metrics,
-                    CONSUMER_METRIC_GROUP_PREFIX,
-                    logContext,
-                    apiVersions,
-                    time,
-                    CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
-                    metadata,
-                    fetcherThrottleTimeSensor);
+                metrics,
+                CONSUMER_METRIC_GROUP_PREFIX,
+                logContext,
+                apiVersions,
+                time,
+                CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
+                metadata,
+                fetcherThrottleTimeSensor);
             this.networkClientDelegate = new NetworkClientDelegate(
-                    this.time,
-                    this.config,
-                    logContext,
-                    networkClient);
+                this.time,
+                this.config,
+                logContext,
+                networkClient);
             this.running = true;
             this.errorEventHandler = new 
ErrorEventHandler(this.backgroundEventQueue);
             this.groupState = new GroupState(rebalanceConfig);
@@ -159,22 +161,24 @@ public class DefaultBackgroundThread extends KafkaThread {
             final int requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 
             OffsetsRequestManager offsetsRequestManager =
-                    new OffsetsRequestManager(
-                            subscriptionState,
-                            metadata,
-                            configuredIsolationLevel(config),
-                            time,
-                            retryBackoffMs,
-                            requestTimeoutMs,
-                            apiVersions,
-                            networkClientDelegate,
-                            logContext);
+                new OffsetsRequestManager(
+                    subscriptionState,
+                    metadata,
+                    configuredIsolationLevel(config),
+                    time,
+                    retryBackoffMs,
+                    requestTimeoutMs,
+                    apiVersions,
+                    networkClientDelegate,
+                    logContext);
             CoordinatorRequestManager coordinatorRequestManager = null;
             CommitRequestManager commitRequestManager = null;
             TopicMetadataRequestManager topicMetadataRequestManger = new 
TopicMetadataRequestManager(
                 logContext,
                 config);
+            HeartbeatRequestManager heartbeatRequestManager = null;
 
+            // TODO: consolidate groupState and memberState
             if (groupState.groupId != null) {
                 coordinatorRequestManager = new CoordinatorRequestManager(
                         this.time,
@@ -190,13 +194,23 @@ public class DefaultBackgroundThread extends KafkaThread {
                         config,
                         coordinatorRequestManager,
                         groupState);
+                MembershipManager membershipManager = new 
MembershipManagerImpl(groupState.groupId);
+                heartbeatRequestManager = new HeartbeatRequestManager(
+                        this.time,
+                        logContext,
+                        config,
+                        coordinatorRequestManager,
+                        subscriptionState,
+                        membershipManager,
+                        errorEventHandler);
             }
 
             this.requestManagers = new RequestManagers(
                 offsetsRequestManager,
                 topicMetadataRequestManger,
                 Optional.ofNullable(coordinatorRequestManager),
-                Optional.ofNullable(commitRequestManager));
+                Optional.ofNullable(commitRequestManager),
+                Optional.ofNullable(heartbeatRequestManager));
             this.applicationEventProcessor = new ApplicationEventProcessor(
                 backgroundEventQueue,
                 requestManagers,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
new file mode 100644
index 00000000000..7941c4effb2
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.</p>
+ *
+ * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ *
+ * <p>If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ *
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
+ *
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
+ *
+ * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.</p>
+ *
+ * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.</p>
+ *
+ * <p>See {@link HeartbeatRequestState} for more details.</p>
+ */
+public class HeartbeatRequestManager implements RequestManager {
+    private final Logger logger;
+    private final Time time;
+
+    /**
+     * Time that the group coordinator will wait on member to revoke its 
partitions. This is provided by the group
+     * coordinator in the heartbeat
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * CoordinatorRequestManager manages the connection to the group 
coordinator
+     */
+    private final CoordinatorRequestManager coordinatorRequestManager;
+
+    /**
+     * SubscriptionState tracks the topic, partition and offset of the member
+     */
+    private final SubscriptionState subscriptions;
+
+    /**
+     * HeartbeatRequestState manages heartbeat request timing and retries
+     */
+    private final HeartbeatRequestState heartbeatRequestState;
+
+    /**
+     * MembershipManager manages member's essential attributes like epoch and 
id, and its rebalance state
+     */
+    private final MembershipManager membershipManager;
+
+    /**
+     * ErrorEventHandler allows the background thread to propagate errors back 
to the user
+     */
+    private final ErrorEventHandler nonRetriableErrorHandler;
+
+    public HeartbeatRequestManager(
+        final Time time,
+        final LogContext logContext,
+        final ConsumerConfig config,
+        final CoordinatorRequestManager coordinatorRequestManager,
+        final SubscriptionState subscriptions,
+        final MembershipManager membershipManager,
+        final ErrorEventHandler nonRetriableErrorHandler) {
+        this.coordinatorRequestManager = coordinatorRequestManager;
+        this.time = time;
+        this.logger = logContext.logger(getClass());
+        this.subscriptions = subscriptions;
+        this.membershipManager = membershipManager;
+        this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+        this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+        long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+        this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+            retryBackoffMaxMs, rebalanceTimeoutMs);
+    }
+
+    // Visible for testing
+    HeartbeatRequestManager(
+        final LogContext logContext,
+        final Time time,
+        final ConsumerConfig config,
+        final CoordinatorRequestManager coordinatorRequestManager,
+        final SubscriptionState subscriptions,
+        final MembershipManager membershipManager,
+        final HeartbeatRequestState heartbeatRequestState,
+        final ErrorEventHandler nonRetriableErrorHandler) {
+        this.logger = logContext.logger(this.getClass());
+        this.time = time;
+        this.subscriptions = subscriptions;
+        this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+        this.coordinatorRequestManager = coordinatorRequestManager;
+        this.heartbeatRequestState = heartbeatRequestState;
+        this.membershipManager = membershipManager;
+        this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+    }
+
+    /**
+     * Determines the maximum wait time until the next poll based on the 
member's state, and creates a heartbeat
+     * request.
+     * <ol>
+     *     <li>If the member is without a coordinator or is in a failed state, 
the timer is set to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+     *     <li>If the member cannot send a heartbeat due to either exponential 
backoff, it will return the remaining time left on the backoff timer.</li>
+     *     <li>If the member's heartbeat timer has not expired, It will return 
the remaining time left on the
+     *     heartbeat timer.</li>
+     *     <li>If the member can send a heartbeat, the timer is set to the 
current heartbeat interval.</li>
+     * </ol>
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+        if (!coordinatorRequestManager.coordinator().isPresent() || 
!membershipManager.shouldSendHeartbeat()) {
+            return new NetworkClientDelegate.PollResult(
+                Long.MAX_VALUE, Collections.emptyList());
+        }
+
+        // TODO: We will need to send a heartbeat response after partitions 
being revoke. This needs to be
+        //  implemented either with or after the partition reconciliation 
logic.
+        if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+            return new NetworkClientDelegate.PollResult(
+                heartbeatRequestState.nextHeartbeatMs(currentTimeMs),
+                Collections.emptyList());
+        }
+        this.heartbeatRequestState.onSendAttempt(currentTimeMs);
+        NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest();
+        return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
+    }
+
+    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+        // TODO: We only need to send the rebalanceTimeoutMs field once unless 
the first request failed.
+        ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(membershipManager.groupId())
+            .setMemberEpoch(membershipManager.memberEpoch())
+            .setMemberId(membershipManager.memberId())
+            .setRebalanceTimeoutMs(rebalanceTimeoutMs);
+
+        membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
+
+        if (this.subscriptions.hasPatternSubscription()) {
+            // TODO: Pass the string to the GC if server side regex is used.
+        } else {
+            data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+        }
+
+        
this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor);
+
+        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
+            new ConsumerGroupHeartbeatRequest.Builder(data),
+            coordinatorRequestManager.coordinator());
+        request.future().whenComplete((response, exception) -> {
+            if (response != null) {
+                onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
+            } else {
+                // TODO: Currently, we lack a good way to propage the response 
time from the network client to the
+                //  request handler. We will need to store the response time 
in the handler to make it accessible.
+                onFailure(exception, time.milliseconds());
+            }
+        });
+        return request;
+    }
+
+    private void onFailure(final Throwable exception, final long 
responseTimeMs) {
+        this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
+        if (exception instanceof RetriableException) {
+            String message = String.format("GroupHeartbeatRequest failed 
because of the retriable exception. " +
+                    "Will retry in %s ms: {}",
+                heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+                exception.getMessage());
+            logger.debug(message);
+        } else {
+            logger.error("GroupHeartbeatRequest failed due to fatal error: " + 
exception.getMessage());
+            handleFatalFailure(exception);
+        }
+    }
+
+    private void onResponse(final ConsumerGroupHeartbeatResponse response, 
long currentTimeMs) {
+        if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
+            
this.heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+            this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
+            this.heartbeatRequestState.resetTimer();
+            this.membershipManager.updateState(response.data());
+            return;
+        }
+        onErrorResponse(response, currentTimeMs);
+    }
+
+    private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+                                 final long currentTimeMs) {
+        Errors error = Errors.forCode(response.data().errorCode());
+        String errorMessage = response.data().errorMessage();
+        // TODO: upon encountering a fatal/fenced error, trigger 
onPartitionLost logic to give up the current
+        //  assignments.
+        switch (error) {
+            case NOT_COORDINATOR:
+                // the manager should retry immediately when the coordinator 
node becomes available again
+                String message = String.format("GroupHeartbeatRequest failed 
because the group coordinator %s is incorrect. " +
+                                "Will attempt to find the coordinator again 
and retry",
+                        coordinatorRequestManager.coordinator());
+                logInfo(message, response, currentTimeMs);
+                coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+                message = String.format("GroupHeartbeatRequest failed because 
the group coordinator %s is not available. " +
+                                "Will attempt to find the coordinator again 
and retry",
+                        coordinatorRequestManager.coordinator());
+                logInfo(message, response, currentTimeMs);
+                coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // the manager will backoff and retry
+                message = String.format("GroupHeartbeatRequest failed because 
the group coordinator %s is still loading." +
+                                "Will retry",
+                        coordinatorRequestManager.coordinator());
+                logInfo(message, response, currentTimeMs);
+                heartbeatRequestState.onFailedAttempt(currentTimeMs);
+                break;
+
+            case GROUP_AUTHORIZATION_FAILED:
+                GroupAuthorizationException exception =
+                        
GroupAuthorizationException.forGroupId(membershipManager.groupId());
+                logger.error("GroupHeartbeatRequest failed due to group 
authorization failure: {}", exception.getMessage());
+                handleFatalFailure(error.exception(exception.getMessage()));
+                break;
+
+            case UNRELEASED_INSTANCE_ID:
+                logger.error("GroupHeartbeatRequest failed due to the instance 
id {} was not released: {}",
+                        membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
+                
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
+                break;
+
+            case INVALID_REQUEST:
+            case GROUP_MAX_SIZE_REACHED:
+            case UNSUPPORTED_ASSIGNOR:
+            case UNSUPPORTED_VERSION:
+                logger.error("GroupHeartbeatRequest failed due to error: {}", 
error);
+                handleFatalFailure(error.exception(errorMessage));
+                break;
+
+            case FENCED_MEMBER_EPOCH:
+                message = String.format("GroupHeartbeatRequest failed because 
member epoch %s is invalid. " +
+                                "Will abandon all partitions and rejoin the 
group",
+                        membershipManager.memberId(), 
membershipManager.memberEpoch());
+                logInfo(message, response, currentTimeMs);
+                membershipManager.transitionToFenced();
+                break;
+
+            case UNKNOWN_MEMBER_ID:
+                message = String.format("GroupHeartbeatRequest failed because 
member id %s is invalid. " +
+                                "Will abandon all partitions and rejoin the 
group",
+                        membershipManager.memberId(), 
membershipManager.memberEpoch());
+                logInfo(message, response, currentTimeMs);
+                membershipManager.transitionToFenced();
+                break;
+
+            default:
+                // If the manager receives an unknown error - there could be a 
bug in the code or a new error code
+                logger.error("GroupHeartbeatRequest failed due to unexpected 
error: {}", error);
+                handleFatalFailure(error.exception(errorMessage));
+                break;
+        }
+    }
+
+    private void logInfo(final String message,
+                         final ConsumerGroupHeartbeatResponse response,
+                         final long currentTimeMs) {
+        logger.info("{} in {}ms: {}",
+            message,
+            heartbeatRequestState.remainingBackoffMs(currentTimeMs),
+            response.data().errorMessage());
+    }
+
+    private void handleFatalFailure(Throwable error) {
+        nonRetriableErrorHandler.handle(error);
+        membershipManager.transitionToFailed();
+    }
+
+    /**
+     * Represents the state of a heartbeat request, including logic for 
timing, retries, and exponential backoff. The
+     * object extends {@link RequestState} to enable exponential backoff and 
duplicated request handling. The two fields
+     * that it holds are:
+     */
+    static class HeartbeatRequestState extends RequestState {
+        /**
+         *  heartbeatTimer tracks the time since the last heartbeat was sent
+         */
+        private final Timer heartbeatTimer;
+
+        /**
+         * The heartbeat interval which is acquired/updated through the 
heartbeat request
+         */
+        private long heartbeatIntervalMs;
+
+        public HeartbeatRequestState(
+            final LogContext logContext,
+            final Time time,
+            final long heartbeatIntervalMs,
+            final long retryBackoffMs,
+            final long retryBackoffMaxMs) {
+            super(logContext, HeartbeatRequestState.class.getName(), 
retryBackoffMs, retryBackoffMaxMs);
+            this.heartbeatIntervalMs = heartbeatIntervalMs;
+            this.heartbeatTimer = time.timer(heartbeatIntervalMs);
+        }
+
+        public HeartbeatRequestState(
+            final LogContext logContext,
+            final Time time,
+            final long heartbeatIntervalMs,
+            final long retryBackoffMs,
+            final long retryBackoffMaxMs,
+            final double jitter) {
+            super(logContext, HeartbeatRequestState.class.getName(), 
retryBackoffMs, 2, retryBackoffMaxMs, jitter);
+            this.heartbeatIntervalMs = heartbeatIntervalMs;
+            this.heartbeatTimer = time.timer(heartbeatIntervalMs);
+        }
+
+        private void update(final long currentTimeMs) {
+            this.heartbeatTimer.update(currentTimeMs);
+        }
+
+        public void resetTimer() {
+            this.heartbeatTimer.reset(heartbeatIntervalMs);
+        }
+
+        @Override
+        public boolean canSendRequest(final long currentTimeMs) {
+            update(currentTimeMs);
+            return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
+        }
+
+        public long nextHeartbeatMs(final long currentTimeMs) {
+            if (heartbeatTimer.remainingMs() == 0) {
+                return this.remainingBackoffMs(currentTimeMs);
+            }
+            return heartbeatTimer.remainingMs();
+        }
+
+        private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) 
{
+            if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
+                // no need to update the timer if the interval hasn't changed
+                return;
+            }
+            this.heartbeatIntervalMs = heartbeatIntervalMs;
+            this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index a0a72e56336..c0fb9ed3903 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -59,4 +59,20 @@ public interface MembershipManager {
      * current assignment.
      */
     void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+
+    /**
+     * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a
+     * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+     */
+    void transitionToFenced();
+
+    /**
+     * Transition the member to the FAILED state.  This is invoked when the 
heartbeat returns a non-retriable error.
+     */
+    void transitionToFailed();
+
+    /**
+     * Return true if the member should send heartbeat to the coordinator
+     */
+    boolean shouldSendHeartbeat();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index ad425f41d72..f4f97bb36f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -115,25 +115,36 @@ public class MembershipManagerImpl implements 
MembershipManager {
 
     @Override
     public void updateState(ConsumerGroupHeartbeatResponseData response) {
-        if (response.errorCode() == Errors.NONE.code()) {
-            this.memberId = response.memberId();
-            this.memberEpoch = response.memberEpoch();
-            ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
-            if (assignment != null) {
-                setTargetAssignment(assignment);
-            }
-            maybeTransitionToStable();
-        } else {
-            if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
-                resetEpoch();
-                transitionTo(MemberState.FENCED);
-            } else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
-                transitionTo(MemberState.FAILED);
-            }
-            // TODO: handle other errors here to update state accordingly, 
mainly making the
-            //  distinction between the recoverable errors and the fatal ones, 
that should FAILED
-            //  the member
+        if (response.errorCode() != Errors.NONE.code()) {
+            String errorMessage = String.format(
+                    "Unexpected error in Heartbeat response. Expected no 
error, but received: %s",
+                    Errors.forCode(response.errorCode())
+            );
+            throw new IllegalStateException(errorMessage);
+        }
+        this.memberId = response.memberId();
+        this.memberEpoch = response.memberEpoch();
+        ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+        if (assignment != null) {
+            setTargetAssignment(assignment);
         }
+        maybeTransitionToStable();
+    }
+
+    @Override
+    public void transitionToFenced() {
+        resetEpoch();
+        transitionTo(MemberState.FENCED);
+    }
+
+    @Override
+    public void transitionToFailed() {
+        transitionTo(MemberState.FAILED);
+    }
+
+    @Override
+    public boolean shouldSendHeartbeat() {
+        return state() != MemberState.FAILED;
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 3864b1fcaa6..5a397532fe3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -32,6 +32,7 @@ public class RequestManagers {
 
     public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
     public final Optional<CommitRequestManager> commitRequestManager;
+    private final Optional<HeartbeatRequestManager> heartbeatRequestManager;
     public final OffsetsRequestManager offsetsRequestManager;
     public final TopicMetadataRequestManager topicMetadataRequestManager;
     private final List<Optional<? extends RequestManager>> entries;
@@ -39,16 +40,19 @@ public class RequestManagers {
     public RequestManagers(OffsetsRequestManager offsetsRequestManager,
                            TopicMetadataRequestManager 
topicMetadataRequestManager,
                            Optional<CoordinatorRequestManager> 
coordinatorRequestManager,
-                           Optional<CommitRequestManager> 
commitRequestManager) {
+                           Optional<CommitRequestManager> commitRequestManager,
+                           Optional<HeartbeatRequestManager> 
heartbeatRequestManager) {
         this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
                 "OffsetsRequestManager cannot be null");
         this.coordinatorRequestManager = coordinatorRequestManager;
         this.commitRequestManager = commitRequestManager;
         this.topicMetadataRequestManager = topicMetadataRequestManager;
+        this.heartbeatRequestManager = heartbeatRequestManager;
 
         List<Optional<? extends RequestManager>> list = new ArrayList<>();
         list.add(coordinatorRequestManager);
         list.add(commitRequestManager);
+        list.add(heartbeatRequestManager);
         list.add(Optional.of(offsetsRequestManager));
         list.add(Optional.of(topicMetadataRequestManager));
         entries = Collections.unmodifiableList(list);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
index 4fc08290b71..74d35983291 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
@@ -50,8 +50,8 @@ public class ErrorBackgroundEvent extends BackgroundEvent {
     @Override
     public String toString() {
         return "ErrorBackgroundEvent{" +
-                toStringBase() +
-                ", error=" + error +
-                '}';
+            toStringBase() +
+            ", error=" + error +
+            '}';
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
index a05db31618a..e86a79fcd68 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
@@ -36,7 +36,7 @@
     { "name": "RackId", "type": "string", "versions": "0+",  
"nullableVersions": "0+", "default": "null",
       "about": "null if not provided or if it didn't change since the last 
heartbeat; the rack ID of consumer otherwise." },
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
-      "about": "-1 if it didn't chance since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
+      "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
     { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
       "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
     { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
index bdae8adcbe8..57c61e3dc14 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class AssignorSelectionTest {
 
@@ -51,6 +52,7 @@ public class AssignorSelectionTest {
         String assignorName = "uniform";
         AssignorSelection selection = 
AssignorSelection.newServerAssignor(assignorName);
         assertEquals(AssignorSelection.Type.SERVER, selection.type());
-        assertEquals(assignorName, selection.serverAssignor());
+        assertTrue(selection.serverAssignor().isPresent());
+        assertEquals(assignorName, selection.serverAssignor().get());
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index 137bd106d6e..a7d8ac0a61a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -86,6 +86,7 @@ public class DefaultBackgroundThreadTest {
     private GroupState groupState;
     private CommitRequestManager commitManager;
     private TopicMetadataRequestManager topicMetadataRequestManager;
+    private HeartbeatRequestManager heartbeatRequestManager;
 
     @BeforeEach
     @SuppressWarnings("unchecked")
@@ -98,6 +99,7 @@ public class DefaultBackgroundThreadTest {
         this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
         this.coordinatorManager = mock(CoordinatorRequestManager.class);
         this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+        this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
         this.errorEventHandler = mock(ErrorEventHandler.class);
         GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
                 100,
@@ -117,8 +119,9 @@ public class DefaultBackgroundThreadTest {
     public void testStartupAndTearDown() throws InterruptedException {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         backgroundThread.start();
         TestUtils.waitForCondition(backgroundThread::isRunning, "Failed 
awaiting for the background thread to be running");
@@ -130,10 +133,11 @@ public class DefaultBackgroundThreadTest {
     public void testApplicationEvent() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
-        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         ApplicationEvent e = new NoopApplicationEvent("noop event");
         this.applicationEventsQueue.add(e);
@@ -150,10 +154,11 @@ public class DefaultBackgroundThreadTest {
                 this.backgroundEventsQueue,
                 mockRequestManagers(),
                 metadata);
-        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
         this.applicationEventsQueue.add(e);
@@ -166,10 +171,11 @@ public class DefaultBackgroundThreadTest {
     public void testCommitEvent() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
-        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
         this.applicationEventsQueue.add(e);
@@ -183,8 +189,9 @@ public class DefaultBackgroundThreadTest {
     public void testListOffsetsEventIsProcessed() {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -201,8 +208,9 @@ public class DefaultBackgroundThreadTest {
     public void testResetPositionsEventIsProcessed() {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -218,7 +226,7 @@ public class DefaultBackgroundThreadTest {
     public void testResetPositionsProcessFailure() {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         applicationEventProcessor = spy(new ApplicationEventProcessor(
@@ -242,8 +250,9 @@ public class DefaultBackgroundThreadTest {
     public void testValidatePositionsEventIsProcessed() {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -259,7 +268,7 @@ public class DefaultBackgroundThreadTest {
     public void testValidatePositionsProcessFailure() {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         applicationEventProcessor = spy(new ApplicationEventProcessor(
@@ -295,10 +304,11 @@ public class DefaultBackgroundThreadTest {
         ApplicationEvent e = new AssignmentChangeApplicationEvent(offset, 
currentTimeMs);
         this.applicationEventsQueue.add(e);
 
-        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
 
         backgroundThread.runOnce();
         
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
@@ -312,10 +322,11 @@ public class DefaultBackgroundThreadTest {
     @Test
     void testFindCoordinator() {
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
-        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         backgroundThread.runOnce();
         Mockito.verify(coordinatorManager, times(1)).poll(anyLong());
         Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong());
@@ -326,10 +337,11 @@ public class DefaultBackgroundThreadTest {
     void testFetchTopicMetadata() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
-        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
-        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
+        
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults());
         this.applicationEventsQueue.add(new 
TopicMetadataApplicationEvent("topic"));
         backgroundThread.runOnce();
         
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class));
@@ -362,10 +374,11 @@ public class DefaultBackgroundThreadTest {
 
     private RequestManagers mockRequestManagers() {
         return new RequestManagers(
-                offsetsRequestManager,
-                topicMetadataRequestManager,
-                Optional.of(coordinatorManager),
-                Optional.of(commitManager));
+            offsetsRequestManager,
+            topicMetadataRequestManager,
+            Optional.of(coordinatorManager),
+            Optional.of(commitManager),
+            Optional.of(heartbeatRequestManager));
     }
 
     private static NetworkClientDelegate.UnsentRequest 
findCoordinatorUnsentRequest(
@@ -388,20 +401,21 @@ public class DefaultBackgroundThreadTest {
         properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
 
         return new DefaultBackgroundThread(
-                this.time,
-                new ConsumerConfig(properties),
-                new LogContext(),
-                applicationEventsQueue,
-                backgroundEventsQueue,
-                this.errorEventHandler,
-                applicationEventProcessor,
-                this.metadata,
-                this.networkClient,
-                this.groupState,
-                this.coordinatorManager,
-                this.commitManager,
-                this.offsetsRequestManager,
-                this.topicMetadataRequestManager);
+            this.time,
+            new ConsumerConfig(properties),
+            new LogContext(),
+            applicationEventsQueue,
+            backgroundEventsQueue,
+            this.errorEventHandler,
+            applicationEventProcessor,
+            this.metadata,
+            this.networkClient,
+            this.groupState,
+            this.coordinatorManager,
+            this.commitManager,
+            this.offsetsRequestManager,
+            this.topicMetadataRequestManager,
+            this.heartbeatRequestManager);
     }
 
     private NetworkClientDelegate.PollResult mockPollCoordinatorResult() {
@@ -416,7 +430,7 @@ public class DefaultBackgroundThreadTest {
             Collections.singletonList(findCoordinatorUnsentRequest(time, 
requestTimeoutMs)));
     }
 
-    private NetworkClientDelegate.PollResult emptyPollOffsetsRequestResult() {
+    private NetworkClientDelegate.PollResult emptyPollResults() {
         return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
new file mode 100644
index 00000000000..598deace7f5
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+    private static final int HEARTBEAT_INTERVAL_MS = 1000;
+    private static final long RETRY_BACKOFF_MAX_MS = 3000;
+    private static final long RETRY_BACKOFF_MS = 100;
+    private static final String GROUP_INSTANCE_ID = "group-instance-id";
+    private static final String GROUP_ID = "group-id";
+
+    private Time time;
+    private LogContext logContext;
+    private CoordinatorRequestManager coordinatorRequestManager;
+    private SubscriptionState subscriptionState;
+    private HeartbeatRequestManager heartbeatRequestManager;
+    private MembershipManager membershipManager;
+    private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+    private ConsumerConfig config;
+
+    private String memberId = "member-id";
+    private int memberEpoch = 1;
+    private ErrorEventHandler errorEventHandler;
+
+    @BeforeEach
+    public void setUp() {
+        time = new MockTime();
+        logContext = new LogContext();
+        config = new ConsumerConfig(createConsumerConfig());
+        coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
+        subscriptionState = mock(SubscriptionState.class);
+        membershipManager = spy(new MembershipManagerImpl(GROUP_ID));
+        heartbeatRequestState = 
mock(HeartbeatRequestManager.HeartbeatRequestState.class);
+        errorEventHandler = mock(ErrorEventHandler.class);
+        heartbeatRequestManager = createManager();
+    }
+
+    private Properties createConsumerConfig() {
+        Properties properties = new Properties();
+        properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+        return properties;
+    }
+
+    @Test
+    public void testHeartbeatOnStartup() {
+        // The initial heartbeatInterval is set to 0
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            logContext,
+            time,
+            0,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MAX_MS,
+            0);
+        heartbeatRequestManager = createManager();
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        // Ensure we do not resend the request without the first request being 
completed
+        NetworkClientDelegate.PollResult result2 = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result2.unsentRequests.size());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSendHeartbeatOnMemberState(final boolean 
shouldSendHeartbeat) {
+        // Mocking notInGroup
+        
when(membershipManager.shouldSendHeartbeat()).thenReturn(shouldSendHeartbeat);
+        when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        if (shouldSendHeartbeat) {
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(0, result.timeUntilNextPollMs);
+        } else {
+            assertEquals(0, result.unsentRequests.size());
+            assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("stateProvider")
+    public void testTimerNotDue(final MemberState state) {
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            logContext,
+            time,
+            HEARTBEAT_INTERVAL_MS,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MAX_MS);
+        heartbeatRequestManager = createManager();
+
+        when(membershipManager.state()).thenReturn(state);
+        time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size());
+
+        if (membershipManager.shouldSendHeartbeat()) {
+            assertEquals(HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+        } else {
+            assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+        }
+    }
+
+    @Test
+    public void testBackoffOnTimeout() {
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            logContext,
+            time,
+            0,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MAX_MS,
+            0);
+        heartbeatRequestManager = createManager();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
+        when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        result.unsentRequests.get(0).future().completeExceptionally(new 
TimeoutException("timeout"));
+
+        // Assure the manager will backoff on timeout
+        time.sleep(RETRY_BACKOFF_MS - 1);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size());
+
+        time.sleep(1);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+    }
+
+    @Test
+    public void testFailureOnFatalException() {
+        heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(
+            logContext,
+            time,
+            0,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MAX_MS,
+            0));
+        heartbeatRequestManager = createManager();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
+        when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        result.unsentRequests.get(0).future().completeExceptionally(new 
KafkaException("fatal"));
+        verify(membershipManager).transitionToFailed();
+        verify(errorEventHandler).handle(any());
+    }
+
+    @Test
+    public void testNoCoordinator() {
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+        assertEquals(0, result.unsentRequests.size());
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+    public void testValidateConsumerGroupHeartbeatRequest(final short version) 
{
+        List<String> subscribedTopics = Collections.singletonList("topic");
+        subscriptionState = new SubscriptionState(logContext, 
OffsetResetStrategy.NONE);
+        subscriptionState.subscribe(new HashSet<>(subscribedTopics), new 
NoOpConsumerRebalanceListener());
+
+        Properties prop = createConsumerConfig();
+        prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000");
+        config = new ConsumerConfig(prop);
+        membershipManager = new MembershipManagerImpl(GROUP_ID, 
GROUP_INSTANCE_ID, null);
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            logContext,
+            time,
+            0,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MAX_MS,
+            0);
+        heartbeatRequestManager = createManager();
+        // Update membershipManager's memberId and memberEpoch
+        ConsumerGroupHeartbeatResponse result =
+            new ConsumerGroupHeartbeatResponse(new 
ConsumerGroupHeartbeatResponseData()
+            .setMemberId(memberId)
+            .setMemberEpoch(memberEpoch));
+        membershipManager.updateState(result.data());
+
+        // Create a ConsumerHeartbeatRequest and verify the payload
+        NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, pollResult.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest request = 
pollResult.unsentRequests.get(0);
+        assertTrue(request.requestBuilder() instanceof 
ConsumerGroupHeartbeatRequest.Builder);
+
+        ConsumerGroupHeartbeatRequest heartbeatRequest =
+            (ConsumerGroupHeartbeatRequest) 
request.requestBuilder().build(version);
+        assertEquals(GROUP_ID, heartbeatRequest.data().groupId());
+        assertEquals(memberId, heartbeatRequest.data().memberId());
+        assertEquals(memberEpoch, heartbeatRequest.data().memberEpoch());
+        assertEquals(10000, heartbeatRequest.data().rebalanceTimeoutMs());
+        assertEquals(subscribedTopics, 
heartbeatRequest.data().subscribedTopicNames());
+        assertEquals(GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId());
+        // TODO: Test pattern subscription and user provided assignor 
selection.
+        assertNull(heartbeatRequest.data().serverAssignor());
+        assertNull(heartbeatRequest.data().subscribedTopicRegex());
+    }
+
+    @ParameterizedTest
+    @MethodSource("errorProvider")
+    public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            logContext,
+            time,
+            HEARTBEAT_INTERVAL_MS,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MAX_MS,
+            0);
+        heartbeatRequestManager = createManager();
+
+        // Sending first heartbeat w/o assignment to set the state to STABLE
+        ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+            .setHeartbeatIntervalMs(HEARTBEAT_INTERVAL_MS)
+            .setMemberId(memberId)
+            .setMemberEpoch(memberEpoch));
+        membershipManager.updateState(rs1.data());
+        assertEquals(MemberState.STABLE, membershipManager.state());
+
+        // Handling errors on the second heartbeat
+        time.sleep(HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        // Manually completing the response to test error handling
+        ClientResponse response = createHeartbeatResponse(
+            result.unsentRequests.get(0),
+            error);
+        result.unsentRequests.get(0).future().complete(response);
+        ConsumerGroupHeartbeatResponse mockResponse = 
(ConsumerGroupHeartbeatResponse) response.responseBody();
+
+        switch (error) {
+            case NONE:
+                verify(errorEventHandler, never()).handle(any());
+                verify(membershipManager, 
times(2)).updateState(mockResponse.data());
+                assertEquals(HEARTBEAT_INTERVAL_MS, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                verify(errorEventHandler, never()).handle(any());
+                assertEquals(RETRY_BACKOFF_MS, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                verify(errorEventHandler, never()).handle(any());
+                
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
+                assertEquals(0, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+                break;
+
+            default:
+                if (isFatal) {
+                    // The memberStateManager should have stopped heartbeat at 
this point
+                    ensureFatalError();
+                } else {
+                    verify(errorEventHandler, never()).handle(any());
+                    assertEquals(0, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+                }
+                break;
+        }
+    }
+
+    private void ensureFatalError() {
+        verify(membershipManager).transitionToFailed();
+        verify(errorEventHandler).handle(any());
+        ensureHeartbeatStopped();
+    }
+
+    private void ensureHeartbeatStopped() {
+        time.sleep(HEARTBEAT_INTERVAL_MS);
+        assertEquals(MemberState.FAILED, membershipManager.state());
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size());
+    }
+
+    // error, isFatal
+    private static Collection<Arguments> errorProvider() {
+        return Arrays.asList(
+            Arguments.of(Errors.NONE, false),
+            Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, false),
+            Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, false),
+            Arguments.of(Errors.NOT_COORDINATOR, false),
+            Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, true),
+            Arguments.of(Errors.INVALID_REQUEST, true),
+            Arguments.of(Errors.UNKNOWN_MEMBER_ID, false),
+            Arguments.of(Errors.FENCED_MEMBER_EPOCH, false),
+            Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true),
+            Arguments.of(Errors.UNSUPPORTED_VERSION, true),
+            Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true),
+            Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true));
+    }
+
+    private static Collection<Arguments> stateProvider() {
+        return Arrays.asList(
+            Arguments.of(MemberState.UNJOINED),
+            Arguments.of(MemberState.RECONCILING),
+            Arguments.of(MemberState.FAILED),
+            Arguments.of(MemberState.STABLE),
+            Arguments.of(MemberState.FENCED));
+    }
+
+    private ClientResponse createHeartbeatResponse(
+        final NetworkClientDelegate.UnsentRequest request,
+        final Errors error) {
+        ConsumerGroupHeartbeatResponseData data = new 
ConsumerGroupHeartbeatResponseData()
+            .setErrorCode(error.code())
+            .setHeartbeatIntervalMs(HEARTBEAT_INTERVAL_MS)
+            .setMemberId(memberId)
+            .setMemberEpoch(memberEpoch);
+        if (error != Errors.NONE) {
+            data.setErrorMessage("stubbed error message");
+        }
+        ConsumerGroupHeartbeatResponse response = new 
ConsumerGroupHeartbeatResponse(data);
+        return new ClientResponse(
+            new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, 
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
+            request.callback(),
+            "0",
+            time.milliseconds(),
+            time.milliseconds(),
+            false,
+            null,
+            null,
+            response);
+    }
+
+    private HeartbeatRequestManager createManager() {
+        return new HeartbeatRequestManager(
+            logContext,
+            time,
+            config,
+            coordinatorRequestManager,
+            subscriptionState,
+            membershipManager,
+            heartbeatRequestState,
+            errorEventHandler);
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 09e47acb416..4b6aa80c04e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -22,8 +22,6 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Arrays;
 
@@ -84,9 +82,23 @@ public class MembershipManagerImplTest {
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
-    @ParameterizedTest
-    @EnumSource(Errors.class)
-    public void testMemberIdAndEpochResetOnErrors(Errors error) {
+    @Test
+    public void testMemberIdAndEpochResetOnFencedMembers() {
+        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
+        ConsumerGroupHeartbeatResponse heartbeatResponse =
+                createConsumerGroupHeartbeatResponse(null);
+        membershipManager.updateState(heartbeatResponse.data());
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertEquals(MEMBER_ID, membershipManager.memberId());
+        assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
+
+        membershipManager.transitionToFenced();
+        assertFalse(membershipManager.memberId().isEmpty());
+        assertEquals(0, membershipManager.memberEpoch());
+    }
+
+    @Test
+    public void testTransitionToFailure() {
         MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID);
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
@@ -95,23 +107,8 @@ public class MembershipManagerImplTest {
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
-        if (error == Errors.UNKNOWN_MEMBER_ID || error == 
Errors.FENCED_MEMBER_EPOCH) {
-            // Should reset member epoch and keep member id
-            ConsumerGroupHeartbeatResponse heartbeatResponseWithMemberIdError =
-                    
createConsumerGroupHeartbeatResponseWithError(Errors.FENCED_MEMBER_EPOCH);
-            
membershipManager.updateState(heartbeatResponseWithMemberIdError.data());
-
-            assertFalse(membershipManager.memberId().isEmpty());
-            assertEquals(0, membershipManager.memberEpoch());
-        } else {
-            // Should not reset member id or epoch
-            ConsumerGroupHeartbeatResponse heartbeatResponseWithError =
-                    createConsumerGroupHeartbeatResponseWithError(error);
-            membershipManager.updateState(heartbeatResponseWithError.data());
-
-            assertFalse(membershipManager.memberId().isEmpty());
-            assertNotEquals(0, membershipManager.memberEpoch());
-        }
+        membershipManager.transitionToFailed();
+        assertEquals(MemberState.FAILED, membershipManager.state());
     }
 
     @Test

Reply via email to