This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 59e5890505b KAFKA-18736: Decide when a heartbeat should be sent 
(#19121)
59e5890505b is described below

commit 59e5890505b944a7911a676371a259027f9f468d
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 10 17:39:57 2025 +0100

    KAFKA-18736: Decide when a heartbeat should be sent (#19121)
    
    This commit adds the conditions to decide when a Streams group heartbeat
    should be sent.
    A heartbeat should be sent when:
    - the group coordinator is available
    - the member is part of the Streams group or wants to join it
    - the heartbeat interval expired or the member is leaving the group or
    acknowledging the assginment
    
    This commit does not implement:
    - not sending fields that did not change
    - handling errors
    
    Reviewers: Zheguang Zhao <[email protected]>, Lucas
    Brutschy <[email protected]>
---
 .../StreamsGroupHeartbeatRequestManager.java       | 142 +++++-
 .../internals/StreamsMembershipManager.java        |   2 +-
 .../StreamsGroupHeartbeatRequestManagerTest.java   | 525 +++++++++++++++++----
 .../internals/StreamsMembershipManagerTest.java    |  20 +-
 4 files changed, 572 insertions(+), 117 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 4e8bff80366..0f17353492e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
@@ -28,6 +30,7 @@ import 
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 
 import org.slf4j.Logger;
 
@@ -43,6 +46,17 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+/**
+ * <p>Manages the request creation and response handling for the Streams group 
heartbeat. The class creates a
+ * heartbeat request using the state stored in the membership manager. The 
requests can be retrieved
+ * by calling {@link StreamsGroupHeartbeatRequestManager#poll(long)}. Once the 
response is received, it updates the
+ * state in the membership manager and handles any errors.
+ *
+ * <p>The heartbeat manager generates heartbeat requests based on the member 
state. It's also responsible
+ * for the timing of the heartbeat requests to ensure they are sent according 
to the heartbeat interval
+ * (while the member state is stable) or on demand (while the member is 
acknowledging an assignment or
+ * leaving the group).
+ */
 public class StreamsGroupHeartbeatRequestManager implements RequestManager {
 
     static class HeartbeatState {
@@ -59,6 +73,9 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         }
 
+        public void reset() {
+        }
+
         public StreamsGroupHeartbeatRequestData buildRequestData() {
             StreamsGroupHeartbeatRequestData data = new 
StreamsGroupHeartbeatRequestData();
             data.setGroupId(membershipManager.groupId());
@@ -205,7 +222,6 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         }
     }
 
-
     private final Logger logger;
 
     private final int maxPollIntervalMs;
@@ -218,15 +234,24 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
 
     private final StreamsMembershipManager membershipManager;
 
+    private final BackgroundEventHandler backgroundEventHandler;
+
     private final HeartbeatMetricsManager metricsManager;
 
     private StreamsRebalanceData streamsRebalanceData;
 
+    /**
+     * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+     * sending heartbeat until the next poll.
+     */
+    private final Timer pollTimer;
+
     public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
                                                final Time time,
                                                final ConsumerConfig config,
                                                final CoordinatorRequestManager 
coordinatorRequestManager,
                                                final StreamsMembershipManager 
membershipManager,
+                                               final BackgroundEventHandler 
backgroundEventHandler,
                                                final Metrics metrics,
                                                final StreamsRebalanceData 
streamsRebalanceData) {
         this.logger = logContext.logger(getClass());
@@ -238,6 +263,10 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
             membershipManager,
             "Streams membership manager cannot be null"
         );
+        this.backgroundEventHandler = Objects.requireNonNull(
+            backgroundEventHandler,
+            "Background event handler cannot be null"
+        );
         this.metricsManager = new HeartbeatMetricsManager(
             Objects.requireNonNull(metrics, "Metrics cannot be null")
         );
@@ -254,31 +283,119 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
             retryBackoffMaxMs,
             maxPollIntervalMs
         );
+        this.pollTimer = time.timer(maxPollIntervalMs);
     }
 
+    /**
+     * This will build a heartbeat request if one must be sent, determined 
based on the member
+     * state. A heartbeat is sent when all of the following applies:
+     * <ol>
+     *     <li>Member is part of the consumer group or wants to join it.</li>
+     *     <li>The heartbeat interval has expired, or the member is in a state 
that indicates
+     *     that it should heartbeat without waiting for the interval.</li>
+     * </ol>
+     * This will also determine the maximum wait time until the next poll 
based on the member's
+     * state.
+     * <ol>
+     *     <li>If the member is without a coordinator or is in a failed state, 
the timer is set
+     *     to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+     *     <li>If the member cannot send a heartbeat due to either exponential 
backoff, it will
+     *     return the remaining time left on the backoff timer.</li>
+     *     <li>If the member's heartbeat timer has not expired, It will return 
the remaining time
+     *     left on the heartbeat timer.</li>
+     *     <li>If the member can send a heartbeat, the timer is set to the 
current heartbeat interval.</li>
+     * </ol>
+     *
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
that includes a
+     *         heartbeat request if one must be sent, and the time to wait 
until the next poll.
+     */
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        return new NetworkClientDelegate.PollResult(
-            heartbeatRequestState.heartbeatIntervalMs(),
-            Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
-        );
+        if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager.shouldSkipHeartbeat()) {
+            membershipManager.onHeartbeatRequestSkipped();
+            maybePropagateCoordinatorFatalErrorEvent();
+            return NetworkClientDelegate.PollResult.EMPTY;
+        }
+        pollTimer.update(currentTimeMs);
+        if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+            logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+                "subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+                "which typically implies that the poll loop is spending too 
much time processing " +
+                "messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+                "reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+
+            membershipManager.onPollTimerExpired();
+            NetworkClientDelegate.UnsentRequest leaveHeartbeat = 
makeHeartbeatRequestAndLogResponse(currentTimeMs);
+
+            // We can ignore the leave response because we can join before or 
after receiving the response.
+            heartbeatRequestState.reset();
+            heartbeatState.reset();
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(leaveHeartbeat));
+        }
+        if (shouldHeartbeatBeforeIntervalExpires() || 
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequestAndHandleResponse(currentTimeMs);
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(request));
+        } else {
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+        }
     }
 
-    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs) {
-        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new 
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-            coordinatorRequestManager.coordinator()
-        );
-        request.whenComplete((response, exception) -> {
+    /**
+     * A heartbeat should be sent without waiting for the heartbeat interval 
to expire if:
+     * - the member is leaving the group
+     * or
+     * - the member is joining the group or acknowledging the assignment and 
for both cases there is no heartbeat request
+     *   in flight.
+     *
+     * @return true if a heartbeat should be sent before the interval expires, 
false otherwise
+     */
+    private boolean shouldHeartbeatBeforeIntervalExpires() {
+        return membershipManager.state() == MemberState.LEAVING
+            ||
+            (membershipManager.state() == MemberState.JOINING || 
membershipManager.state() == MemberState.ACKNOWLEDGING)
+                && !heartbeatRequestState.requestInFlight();
+    }
+
+    private void maybePropagateCoordinatorFatalErrorEvent() {
+        coordinatorRequestManager.getAndClearFatalError()
+            .ifPresent(fatalError -> backgroundEventHandler.add(new 
ErrorEvent(fatalError)));
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequestAndLogResponse(final long currentTimeMs) {
+        return makeHeartbeatRequest(currentTimeMs).whenComplete((response, 
exception) -> {
+            if (response != null) {
+                
metricsManager.recordRequestLatency(response.requestLatencyMs());
+                Errors error = Errors.forCode(((StreamsGroupHeartbeatResponse) 
response.responseBody()).data().errorCode());
+                if (error == Errors.NONE)
+                    logger.debug("StreamsGroupHeartbeatRequest responded 
successfully: {}", response);
+                else
+                    logger.error("StreamsGroupHeartbeatRequest failed because 
of {}: {}", error, response);
+            } else {
+                logger.error("StreamsGroupHeartbeatRequest failed because of 
unexpected exception.", exception);
+            }
+        });
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequestAndHandleResponse(final long currentTimeMs) {
+        NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs);
+        return request.whenComplete((response, exception) -> {
             long completionTimeMs = request.handler().completionTimeMs();
             if (response != null) {
                 
metricsManager.recordRequestLatency(response.requestLatencyMs());
                 onResponse((StreamsGroupHeartbeatResponse) 
response.responseBody(), completionTimeMs);
             }
         });
+    }
+
+    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs) {
+        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
+            new 
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+            coordinatorRequestManager.coordinator()
+        );
         heartbeatRequestState.onSendAttempt(currentTimeMs);
         membershipManager.onHeartbeatRequestGenerated();
         metricsManager.recordHeartbeatSentMs(currentTimeMs);
+        heartbeatRequestState.resetTimer();
         return request;
     }
 
@@ -290,17 +407,14 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
 
     private void onSuccessResponse(final StreamsGroupHeartbeatResponse 
response, final long currentTimeMs) {
         final StreamsGroupHeartbeatResponseData data = response.data();
-
         
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
         heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
-        heartbeatRequestState.resetTimer();
 
         if (data.partitionsByUserEndpoint() != null) {
             streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
         }
 
         List<StreamsGroupHeartbeatResponseData.Status> statuses = 
data.status();
-
         if (statuses != null && !statuses.isEmpty()) {
             String statusDetails = statuses.stream()
                 .map(status -> "(" + status.statusCode() + ") " + 
status.statusDetail())
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index 0d281cd82d9..f5f269f52b5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -591,7 +591,7 @@ public class StreamsMembershipManager implements 
RequestManager {
      * @return True if the member should send heartbeat to the coordinator 
without waiting for
      * the interval.
      */
-    public boolean shouldHeartbeatNow() {
+    public boolean shouldNotWaitForHeartbeatInterval() {
         return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING || state == MemberState.JOINING;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f0c0f6f207f..47b27d52e29 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
@@ -31,10 +33,15 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
+import org.mockito.MockedConstruction;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Arrays;
@@ -47,10 +54,16 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
+import static 
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -58,7 +71,7 @@ import static org.mockito.Mockito.when;
 class StreamsGroupHeartbeatRequestManagerTest {
 
     private static final LogContext LOG_CONTEXT = new LogContext("test");
-    private static final int RECEIVED_HEARTBEAT_INTERVAL_MS = 1200;
+    private static final long RECEIVED_HEARTBEAT_INTERVAL_MS = 1200;
     private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
     private static final String GROUP_ID = "group-id";
     private static final String MEMBER_ID = "member-id";
@@ -105,7 +118,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
     );
     private static final Map<String, StreamsRebalanceData.Subtopology> 
SUBTOPOLOGIES =
         Map.of(SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1);
-    private static final Map<String, String> CLIENT_TAGS = 
Map.of("clientTag1", "value2");
+    private static final String CLIENT_TAG_1 = "client-tag1";
+    private static final String VALUE_1 = "value1";
+    private static final Map<String, String> CLIENT_TAGS = 
Map.of(CLIENT_TAG_1, VALUE_1);
     private static final 
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
ENDPOINT_TO_PARTITIONS =
         List.of(
             new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
@@ -132,18 +147,13 @@ class StreamsGroupHeartbeatRequestManagerTest {
     @Mock
     private StreamsMembershipManager membershipManager;
 
+    @Mock
+    private BackgroundEventHandler backgroundEventHandler;
+
     private final Metrics metrics = new Metrics(time);
 
     private final Node coordinatorNode = new Node(1, "localhost", 9092);
 
-    private static ConsumerConfig config() {
-        Properties prop = new Properties();
-        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
-        return new ConsumerConfig(prop);
-    }
-
     @Test
     public void testConstructWithNullCoordinatorRequestManager() {
         final Exception exception = assertThrows(NullPointerException.class, 
() -> new StreamsGroupHeartbeatRequestManager(
@@ -152,6 +162,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             config,
             null,
             membershipManager,
+            backgroundEventHandler,
             metrics,
             streamsRebalanceData
         ));
@@ -166,12 +177,28 @@ class StreamsGroupHeartbeatRequestManagerTest {
             config,
             coordinatorRequestManager,
             null,
+            backgroundEventHandler,
             metrics,
             streamsRebalanceData
         ));
         assertEquals("Streams membership manager cannot be null", 
exception.getMessage());
     }
 
+    @Test
+    public void testConstructWithNullBackgroundEventHandler() {
+        final Exception exception = assertThrows(NullPointerException.class, 
() -> new StreamsGroupHeartbeatRequestManager(
+            new LogContext("test"),
+            time,
+            config,
+            coordinatorRequestManager,
+            membershipManager,
+            null,
+            metrics,
+            streamsRebalanceData
+        ));
+        assertEquals("Background event handler cannot be null", 
exception.getMessage());
+    }
+
     @Test
     public void testConstructWithNullMetrics() {
         final Exception exception = assertThrows(NullPointerException.class, 
() -> new StreamsGroupHeartbeatRequestManager(
@@ -180,6 +207,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             config,
             coordinatorRequestManager,
             membershipManager,
+            backgroundEventHandler,
             null,
             streamsRebalanceData
         ));
@@ -194,103 +222,416 @@ class StreamsGroupHeartbeatRequestManagerTest {
             config,
             coordinatorRequestManager,
             membershipManager,
+            backgroundEventHandler,
             metrics,
             null
         ));
         assertEquals("Streams rebalance data cannot be null", 
exception.getMessage());
     }
 
+    @Test
+    public void testNoHeartbeatIfCoordinatorUnknown() {
+        try (final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.unsentRequests.size());
+            verify(membershipManager).onHeartbeatRequestSkipped();
+            verify(pollTimer, never()).update();
+        }
+    }
+
+    @Test
+    public void testNoHeartbeatIfHeartbeatSkipped() {
+        try (final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.unsentRequests.size());
+            verify(membershipManager).onHeartbeatRequestSkipped();
+            verify(pollTimer, never()).update();
+        }
+    }
+
+    @Test
+    public void testPropagateCoordinatorFatalErrorToApplicationThread() {
+        final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = 
createStreamsGroupHeartbeatRequestManager();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        final Throwable fatalError = new RuntimeException("KABOOM");
+        
when(coordinatorRequestManager.getAndClearFatalError()).thenReturn(Optional.of(fatalError));
+
+        final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(0, result.unsentRequests.size());
+        verify(membershipManager).onHeartbeatRequestSkipped();
+        verify(backgroundEventHandler).add(argThat(
+            errorEvent -> errorEvent instanceof ErrorEvent && ((ErrorEvent) 
errorEvent).error() == fatalError));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testSendingHeartbeatIfMemberIsLeaving(final boolean 
requestInFlight) {
+        final long heartbeatIntervalMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+                    
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+                    when(mock.requestInFlight()).thenReturn(requestInFlight);
+                });
+             final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = MemberState.class, names = {"JOINING", 
"ACKNOWLEDGING"})
+    public void testSendingHeartbeatIfMemberIsJoiningOrAcknowledging(final 
MemberState memberState) {
+        final long heartbeatIntervalMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+                    
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+                });
+             final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.state()).thenReturn(memberState);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = MemberState.class, names = {"JOINING", 
"ACKNOWLEDGING"})
+    public void 
testNotSendingHeartbeatIfMemberIsJoiningOrAcknowledgingWhenHeartbeatInFlight(final
 MemberState memberState) {
+        final long timeToNextHeartbeatMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+                    
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
+                    when(mock.requestInFlight()).thenReturn(true);
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.state()).thenReturn(memberState);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.unsentRequests.size());
+            assertEquals(timeToNextHeartbeatMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+        }
+    }
+
+    @Test
+    public void testSendingHeartbeatIfHeartbeatCanBeSent() {
+        final long heartbeatIntervalMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                    
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.state()).thenReturn(MemberState.STABLE);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+        }
+    }
+
+    @Test
+    public void testNotSendingHeartbeatIfHeartbeatCannotBeSent() {
+        final long timeToNextHeartbeatMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+                    
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(Timer.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.unsentRequests.size());
+            assertEquals(timeToNextHeartbeatMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+        }
+    }
+
+    @Test
+    public void testSendingLeaveHeartbeatIfPollTimerExpired() {
+        final long heartbeatIntervalMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(
+                Timer.class,
+                (mock, context) -> {
+                    when(mock.isExpired()).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+            verify(membershipManager).onPollTimerExpired();
+            verify(heartbeatRequestState).reset();
+            verify(heartbeatState).reset();
+        }
+    }
+
+    @Test
+    public void 
testNotSendingLeaveHeartbeatIfPollTimerExpiredAndMemberIsLeaving() {
+        final long timeToNextHeartbeatMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(
+                Timer.class,
+                (mock, context) -> {
+                    when(mock.isExpired()).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            final Timer pollTimer = 
pollTimerMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.isLeavingGroup()).thenReturn(true);
+            
when(membershipManager.state()).thenReturn(MemberState.PREPARE_LEAVING);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.unsentRequests.size());
+            assertEquals(timeToNextHeartbeatMs, result.timeUntilNextPollMs);
+            verify(pollTimer).update(time.milliseconds());
+            verify(membershipManager, never()).onPollTimerExpired();
+            verify(heartbeatRequestState, never()).reset();
+            verify(heartbeatState, never()).reset();
+        }
+    }
+
     @Test
     public void testSendingFullHeartbeatRequest() {
-        final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = 
new StreamsGroupHeartbeatRequestManager(
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                })
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.groupId()).thenReturn(GROUP_ID);
+            when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+            when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+            
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.timeUntilNextPollMs);
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(Optional.of(coordinatorNode), 
result.unsentRequests.get(0).node());
+            NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
+            assertEquals(GROUP_ID, streamsRequest.data().groupId());
+            assertEquals(MEMBER_ID, streamsRequest.data().memberId());
+            assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
+            assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
+            assertEquals(PROCESS_ID.toString(), 
streamsRequest.data().processId());
+            assertEquals(ENDPOINT.host(), 
streamsRequest.data().userEndpoint().host());
+            assertEquals(ENDPOINT.port(), 
streamsRequest.data().userEndpoint().port());
+            assertEquals(1, streamsRequest.data().clientTags().size());
+            assertEquals(CLIENT_TAG_1, 
streamsRequest.data().clientTags().get(0).key());
+            assertEquals(VALUE_1, 
streamsRequest.data().clientTags().get(0).value());
+            assertEquals(streamsRebalanceData.topologyEpoch(), 
streamsRequest.data().topology().epoch());
+            assertNotNull(streamsRequest.data().topology());
+            final List<StreamsGroupHeartbeatRequestData.Subtopology> 
subtopologies = streamsRequest.data().topology().subtopologies();
+            assertEquals(1, subtopologies.size());
+            final StreamsGroupHeartbeatRequestData.Subtopology subtopology = 
subtopologies.get(0);
+            assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId());
+            assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), 
subtopology.sourceTopics());
+            assertEquals(Arrays.asList("repartitionSinkTopic1", 
"repartitionSinkTopic2", "repartitionSinkTopic3"), 
subtopology.repartitionSinkTopics());
+            assertEquals(REPARTITION_SOURCE_TOPICS.size(), 
subtopology.repartitionSourceTopics().size());
+            subtopology.repartitionSourceTopics().forEach(topicInfo -> {
+                final StreamsRebalanceData.TopicInfo repartitionTopic = 
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+                assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
+                assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+                assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+            });
+            assertEquals(CHANGELOG_TOPICS.size(), 
subtopology.stateChangelogTopics().size());
+            subtopology.stateChangelogTopics().forEach(topicInfo -> {
+                assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+                assertEquals(0, topicInfo.partitions());
+                final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
+                assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+                assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+            });
+            assertEquals(2, subtopology.copartitionGroups().size());
+            final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
+                new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                    
.setRepartitionSourceTopics(Collections.singletonList((short) 0))
+                    .setSourceTopics(Collections.singletonList((short) 1));
+            final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
+                new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                    
.setRepartitionSourceTopics(Collections.singletonList((short) 1))
+                    .setSourceTopics(Collections.singletonList((short) 0));
+            
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
+            
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
+            verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
+            verify(membershipManager).onHeartbeatRequestGenerated();
+            time.sleep(2000);
+            assertEquals(
+                2.0,
+                
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
+            );
+            final ClientResponse response = buildClientResponse();
+            networkRequest.future().complete(response);
+            
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
+            
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+            
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+            verify(heartbeatRequestState).resetTimer();
+            final List<TopicPartition> topicPartitions = 
streamsRebalanceData.partitionsByHost()
+                .get(new StreamsRebalanceData.HostInfo(
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+                );
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
+            assertEquals(
+                1.0,
+                metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
+            );
+        }
+    }
+
+    @Test
+    public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(
+                Timer.class,
+                (mock, context) -> {
+                    when(mock.isExpired()).thenReturn(true);
+                });
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.groupId()).thenReturn(GROUP_ID);
+            when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+            
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_MEMBER_EPOCH);
+            
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(0, result.timeUntilNextPollMs);
+            assertEquals(1, result.unsentRequests.size());
+            assertEquals(Optional.of(coordinatorNode), 
result.unsentRequests.get(0).node());
+            NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
+            assertEquals(GROUP_ID, streamsRequest.data().groupId());
+            assertEquals(MEMBER_ID, streamsRequest.data().memberId());
+            assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
streamsRequest.data().memberEpoch());
+            assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
+            verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
+            verify(membershipManager).onHeartbeatRequestGenerated();
+            final ClientResponse response = buildClientResponse();
+            networkRequest.future().complete(response);
+            verify(heartbeatRequestState, 
never()).updateHeartbeatIntervalMs(anyLong());
+            verify(heartbeatRequestState, 
never()).onSuccessfulAttempt(anyLong());
+            verify(membershipManager, never()).onHeartbeatSuccess(any());
+        }
+    }
+
+    private static ConsumerConfig config() {
+        Properties prop = new Properties();
+        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
+        return new ConsumerConfig(prop);
+    }
+
+    private StreamsGroupHeartbeatRequestManager 
createStreamsGroupHeartbeatRequestManager() {
+        return new StreamsGroupHeartbeatRequestManager(
             LOG_CONTEXT,
             time,
             config,
             coordinatorRequestManager,
             membershipManager,
+            backgroundEventHandler,
             metrics,
             streamsRebalanceData
         );
-
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
-        when(membershipManager.groupId()).thenReturn(GROUP_ID);
-        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
-        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
-        
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
-
-        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
-
-        assertEquals(0, result.timeUntilNextPollMs);
-        assertEquals(1, result.unsentRequests.size());
-        assertEquals(Optional.of(coordinatorNode), 
result.unsentRequests.get(0).node());
-        NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
-        StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
-        assertEquals(GROUP_ID, streamsRequest.data().groupId());
-        assertEquals(MEMBER_ID, streamsRequest.data().memberId());
-        assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
-        assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
-        assertEquals(PROCESS_ID.toString(), streamsRequest.data().processId());
-        assertEquals(ENDPOINT.host(), 
streamsRequest.data().userEndpoint().host());
-        assertEquals(ENDPOINT.port(), 
streamsRequest.data().userEndpoint().port());
-        assertEquals(1, streamsRequest.data().clientTags().size());
-        assertEquals("clientTag1", 
streamsRequest.data().clientTags().get(0).key());
-        assertEquals("value2", 
streamsRequest.data().clientTags().get(0).value());
-        assertEquals(streamsRebalanceData.topologyEpoch(), 
streamsRequest.data().topology().epoch());
-        assertNotNull(streamsRequest.data().topology());
-        final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies 
= streamsRequest.data().topology().subtopologies();
-        assertEquals(1, subtopologies.size());
-        final StreamsGroupHeartbeatRequestData.Subtopology subtopology = 
subtopologies.get(0);
-        assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId());
-        assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), 
subtopology.sourceTopics());
-        assertEquals(Arrays.asList("repartitionSinkTopic1", 
"repartitionSinkTopic2", "repartitionSinkTopic3"), 
subtopology.repartitionSinkTopics());
-        assertEquals(REPARTITION_SOURCE_TOPICS.size(), 
subtopology.repartitionSourceTopics().size());
-        subtopology.repartitionSourceTopics().forEach(topicInfo -> {
-            final StreamsRebalanceData.TopicInfo repartitionTopic = 
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
-            assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
-            assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
-            assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
-        });
-        assertEquals(CHANGELOG_TOPICS.size(), 
subtopology.stateChangelogTopics().size());
-        subtopology.stateChangelogTopics().forEach(topicInfo -> {
-            assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
-            assertEquals(0, topicInfo.partitions());
-            final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
-            assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
-            assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
-        });
-        assertEquals(2, subtopology.copartitionGroups().size());
-        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
-            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
-                .setRepartitionSourceTopics(Collections.singletonList((short) 
0))
-                .setSourceTopics(Collections.singletonList((short) 1));
-        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
-            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
-                .setRepartitionSourceTopics(Collections.singletonList((short) 
1))
-                .setSourceTopics(Collections.singletonList((short) 0));
-        
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
-        
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
-        verify(membershipManager).onHeartbeatRequestGenerated();
-        time.sleep(2000);
-        assertEquals(
-            2.0,
-            metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
-        );
-        final ClientResponse response = buildClientResponse();
-        networkRequest.future().complete(response);
-        
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
-        final List<TopicPartition> topicPartitions = 
streamsRebalanceData.partitionsByHost()
-            .get(new StreamsRebalanceData.HostInfo(
-                ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
-                ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
-            );
-        
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
-        
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
-        assertEquals(
-            1.0,
-            metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
-        );
-        NetworkClientDelegate.PollResult nextResult = 
heartbeatRequestManager.poll(time.milliseconds());
-        assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, 
nextResult.timeUntilNextPollMs);
     }
 
     private ClientResponse buildClientResponse() {
@@ -306,7 +647,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             new StreamsGroupHeartbeatResponse(
                 new StreamsGroupHeartbeatResponseData()
                     .setPartitionsByUserEndpoint(ENDPOINT_TO_PARTITIONS)
-                    .setHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS)
+                    .setHeartbeatIntervalMs((int) 
RECEIVED_HEARTBEAT_INTERVAL_MS)
             )
         );
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index b9c28406706..5845da7bede 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -1929,70 +1929,70 @@ public class StreamsMembershipManagerTest {
 
     private static void verifyInStateReconciling(final 
StreamsMembershipManager membershipManager) {
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertFalse(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateAcknowledging(final 
StreamsMembershipManager membershipManager) {
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
-        assertTrue(membershipManager.shouldHeartbeatNow());
+        assertTrue(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertFalse(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateLeaving(final StreamsMembershipManager 
membershipManager) {
         assertEquals(MemberState.LEAVING, membershipManager.state());
-        assertTrue(membershipManager.shouldHeartbeatNow());
+        assertTrue(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertFalse(membershipManager.shouldSkipHeartbeat());
         assertTrue(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStatePrepareLeaving(final 
StreamsMembershipManager membershipManager) {
         assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertFalse(membershipManager.shouldSkipHeartbeat());
         assertTrue(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateUnsubscribed(final 
StreamsMembershipManager membershipManager) {
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertTrue(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateJoining(final StreamsMembershipManager 
membershipManager) {
         assertEquals(MemberState.JOINING, membershipManager.state());
-        assertTrue(membershipManager.shouldHeartbeatNow());
+        assertTrue(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertFalse(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateStable(final StreamsMembershipManager 
membershipManager) {
         assertEquals(MemberState.STABLE, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertFalse(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateFenced(final StreamsMembershipManager 
membershipManager) {
         assertEquals(MemberState.FENCED, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertTrue(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateFatal(final StreamsMembershipManager 
membershipManager) {
         assertEquals(MemberState.FATAL, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertTrue(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }
 
     private static void verifyInStateStale(final StreamsMembershipManager 
membershipManager) {
         assertEquals(MemberState.STALE, membershipManager.state());
-        assertFalse(membershipManager.shouldHeartbeatNow());
+        assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
         assertTrue(membershipManager.shouldSkipHeartbeat());
         assertFalse(membershipManager.isLeavingGroup());
     }

Reply via email to