This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 59e5890505b KAFKA-18736: Decide when a heartbeat should be sent
(#19121)
59e5890505b is described below
commit 59e5890505b944a7911a676371a259027f9f468d
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 10 17:39:57 2025 +0100
KAFKA-18736: Decide when a heartbeat should be sent (#19121)
This commit adds the conditions to decide when a Streams group heartbeat
should be sent.
A heartbeat should be sent when:
- the group coordinator is available
- the member is part of the Streams group or wants to join it
- the heartbeat interval expired or the member is leaving the group or
acknowledging the assginment
This commit does not implement:
- not sending fields that did not change
- handling errors
Reviewers: Zheguang Zhao <[email protected]>, Lucas
Brutschy <[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 142 +++++-
.../internals/StreamsMembershipManager.java | 2 +-
.../StreamsGroupHeartbeatRequestManagerTest.java | 525 +++++++++++++++++----
.../internals/StreamsMembershipManagerTest.java | 20 +-
4 files changed, 572 insertions(+), 117 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 4e8bff80366..0f17353492e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
@@ -28,6 +30,7 @@ import
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
@@ -43,6 +46,17 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+/**
+ * <p>Manages the request creation and response handling for the Streams group
heartbeat. The class creates a
+ * heartbeat request using the state stored in the membership manager. The
requests can be retrieved
+ * by calling {@link StreamsGroupHeartbeatRequestManager#poll(long)}. Once the
response is received, it updates the
+ * state in the membership manager and handles any errors.
+ *
+ * <p>The heartbeat manager generates heartbeat requests based on the member
state. It's also responsible
+ * for the timing of the heartbeat requests to ensure they are sent according
to the heartbeat interval
+ * (while the member state is stable) or on demand (while the member is
acknowledging an assignment or
+ * leaving the group).
+ */
public class StreamsGroupHeartbeatRequestManager implements RequestManager {
static class HeartbeatState {
@@ -59,6 +73,9 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
}
+ public void reset() {
+ }
+
public StreamsGroupHeartbeatRequestData buildRequestData() {
StreamsGroupHeartbeatRequestData data = new
StreamsGroupHeartbeatRequestData();
data.setGroupId(membershipManager.groupId());
@@ -205,7 +222,6 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
}
}
-
private final Logger logger;
private final int maxPollIntervalMs;
@@ -218,15 +234,24 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
private final StreamsMembershipManager membershipManager;
+ private final BackgroundEventHandler backgroundEventHandler;
+
private final HeartbeatMetricsManager metricsManager;
private StreamsRebalanceData streamsRebalanceData;
+ /**
+ * Timer for tracking the time since the last consumer poll. If the timer
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */
+ private final Timer pollTimer;
+
public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
final Time time,
final ConsumerConfig config,
final CoordinatorRequestManager
coordinatorRequestManager,
final StreamsMembershipManager
membershipManager,
+ final BackgroundEventHandler
backgroundEventHandler,
final Metrics metrics,
final StreamsRebalanceData
streamsRebalanceData) {
this.logger = logContext.logger(getClass());
@@ -238,6 +263,10 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
membershipManager,
"Streams membership manager cannot be null"
);
+ this.backgroundEventHandler = Objects.requireNonNull(
+ backgroundEventHandler,
+ "Background event handler cannot be null"
+ );
this.metricsManager = new HeartbeatMetricsManager(
Objects.requireNonNull(metrics, "Metrics cannot be null")
);
@@ -254,31 +283,119 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
retryBackoffMaxMs,
maxPollIntervalMs
);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
+ /**
+ * This will build a heartbeat request if one must be sent, determined
based on the member
+ * state. A heartbeat is sent when all of the following applies:
+ * <ol>
+ * <li>Member is part of the consumer group or wants to join it.</li>
+ * <li>The heartbeat interval has expired, or the member is in a state
that indicates
+ * that it should heartbeat without waiting for the interval.</li>
+ * </ol>
+ * This will also determine the maximum wait time until the next poll
based on the member's
+ * state.
+ * <ol>
+ * <li>If the member is without a coordinator or is in a failed state,
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+ * <li>If the member cannot send a heartbeat due to either exponential
backoff, it will
+ * return the remaining time left on the backoff timer.</li>
+ * <li>If the member's heartbeat timer has not expired, It will return
the remaining time
+ * left on the heartbeat timer.</li>
+ * <li>If the member can send a heartbeat, the timer is set to the
current heartbeat interval.</li>
+ * </ol>
+ *
+ * @return {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
that includes a
+ * heartbeat request if one must be sent, and the time to wait
until the next poll.
+ */
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- return new NetworkClientDelegate.PollResult(
- heartbeatRequestState.heartbeatIntervalMs(),
- Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
- );
+ if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager.shouldSkipHeartbeat()) {
+ membershipManager.onHeartbeatRequestSkipped();
+ maybePropagateCoordinatorFatalErrorEvent();
+ return NetworkClientDelegate.PollResult.EMPTY;
+ }
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
+ membershipManager.onPollTimerExpired();
+ NetworkClientDelegate.UnsentRequest leaveHeartbeat =
makeHeartbeatRequestAndLogResponse(currentTimeMs);
+
+ // We can ignore the leave response because we can join before or
after receiving the response.
+ heartbeatRequestState.reset();
+ heartbeatState.reset();
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
+ }
+ if (shouldHeartbeatBeforeIntervalExpires() ||
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequestAndHandleResponse(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(request));
+ } else {
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+ }
}
- private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs) {
- NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
- coordinatorRequestManager.coordinator()
- );
- request.whenComplete((response, exception) -> {
+ /**
+ * A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignment and
for both cases there is no heartbeat request
+ * in flight.
+ *
+ * @return true if a heartbeat should be sent before the interval expires,
false otherwise
+ */
+ private boolean shouldHeartbeatBeforeIntervalExpires() {
+ return membershipManager.state() == MemberState.LEAVING
+ ||
+ (membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING)
+ && !heartbeatRequestState.requestInFlight();
+ }
+
+ private void maybePropagateCoordinatorFatalErrorEvent() {
+ coordinatorRequestManager.getAndClearFatalError()
+ .ifPresent(fatalError -> backgroundEventHandler.add(new
ErrorEvent(fatalError)));
+ }
+
+ private NetworkClientDelegate.UnsentRequest
makeHeartbeatRequestAndLogResponse(final long currentTimeMs) {
+ return makeHeartbeatRequest(currentTimeMs).whenComplete((response,
exception) -> {
+ if (response != null) {
+
metricsManager.recordRequestLatency(response.requestLatencyMs());
+ Errors error = Errors.forCode(((StreamsGroupHeartbeatResponse)
response.responseBody()).data().errorCode());
+ if (error == Errors.NONE)
+ logger.debug("StreamsGroupHeartbeatRequest responded
successfully: {}", response);
+ else
+ logger.error("StreamsGroupHeartbeatRequest failed because
of {}: {}", error, response);
+ } else {
+ logger.error("StreamsGroupHeartbeatRequest failed because of
unexpected exception.", exception);
+ }
+ });
+ }
+
+ private NetworkClientDelegate.UnsentRequest
makeHeartbeatRequestAndHandleResponse(final long currentTimeMs) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs);
+ return request.whenComplete((response, exception) -> {
long completionTimeMs = request.handler().completionTimeMs();
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
onResponse((StreamsGroupHeartbeatResponse)
response.responseBody(), completionTimeMs);
}
});
+ }
+
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs) {
+ NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
+ new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+ coordinatorRequestManager.coordinator()
+ );
heartbeatRequestState.onSendAttempt(currentTimeMs);
membershipManager.onHeartbeatRequestGenerated();
metricsManager.recordHeartbeatSentMs(currentTimeMs);
+ heartbeatRequestState.resetTimer();
return request;
}
@@ -290,17 +407,14 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
private void onSuccessResponse(final StreamsGroupHeartbeatResponse
response, final long currentTimeMs) {
final StreamsGroupHeartbeatResponseData data = response.data();
-
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
- heartbeatRequestState.resetTimer();
if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
}
List<StreamsGroupHeartbeatResponseData.Status> statuses =
data.status();
-
if (statuses != null && !statuses.isEmpty()) {
String statusDetails = statuses.stream()
.map(status -> "(" + status.statusCode() + ") " +
status.statusDetail())
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index 0d281cd82d9..f5f269f52b5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -591,7 +591,7 @@ public class StreamsMembershipManager implements
RequestManager {
* @return True if the member should send heartbeat to the coordinator
without waiting for
* the interval.
*/
- public boolean shouldHeartbeatNow() {
+ public boolean shouldNotWaitForHeartbeatInterval() {
return state == MemberState.ACKNOWLEDGING || state ==
MemberState.LEAVING || state == MemberState.JOINING;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f0c0f6f207f..47b27d52e29 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
@@ -31,10 +33,15 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
+import org.mockito.MockedConstruction;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays;
@@ -47,10 +54,16 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import static
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -58,7 +71,7 @@ import static org.mockito.Mockito.when;
class StreamsGroupHeartbeatRequestManagerTest {
private static final LogContext LOG_CONTEXT = new LogContext("test");
- private static final int RECEIVED_HEARTBEAT_INTERVAL_MS = 1200;
+ private static final long RECEIVED_HEARTBEAT_INTERVAL_MS = 1200;
private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
private static final String GROUP_ID = "group-id";
private static final String MEMBER_ID = "member-id";
@@ -105,7 +118,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
);
private static final Map<String, StreamsRebalanceData.Subtopology>
SUBTOPOLOGIES =
Map.of(SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1);
- private static final Map<String, String> CLIENT_TAGS =
Map.of("clientTag1", "value2");
+ private static final String CLIENT_TAG_1 = "client-tag1";
+ private static final String VALUE_1 = "value1";
+ private static final Map<String, String> CLIENT_TAGS =
Map.of(CLIENT_TAG_1, VALUE_1);
private static final
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
ENDPOINT_TO_PARTITIONS =
List.of(
new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
@@ -132,18 +147,13 @@ class StreamsGroupHeartbeatRequestManagerTest {
@Mock
private StreamsMembershipManager membershipManager;
+ @Mock
+ private BackgroundEventHandler backgroundEventHandler;
+
private final Metrics metrics = new Metrics(time);
private final Node coordinatorNode = new Node(1, "localhost", 9092);
- private static ConsumerConfig config() {
- Properties prop = new Properties();
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
- return new ConsumerConfig(prop);
- }
-
@Test
public void testConstructWithNullCoordinatorRequestManager() {
final Exception exception = assertThrows(NullPointerException.class,
() -> new StreamsGroupHeartbeatRequestManager(
@@ -152,6 +162,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
config,
null,
membershipManager,
+ backgroundEventHandler,
metrics,
streamsRebalanceData
));
@@ -166,12 +177,28 @@ class StreamsGroupHeartbeatRequestManagerTest {
config,
coordinatorRequestManager,
null,
+ backgroundEventHandler,
metrics,
streamsRebalanceData
));
assertEquals("Streams membership manager cannot be null",
exception.getMessage());
}
+ @Test
+ public void testConstructWithNullBackgroundEventHandler() {
+ final Exception exception = assertThrows(NullPointerException.class,
() -> new StreamsGroupHeartbeatRequestManager(
+ new LogContext("test"),
+ time,
+ config,
+ coordinatorRequestManager,
+ membershipManager,
+ null,
+ metrics,
+ streamsRebalanceData
+ ));
+ assertEquals("Background event handler cannot be null",
exception.getMessage());
+ }
+
@Test
public void testConstructWithNullMetrics() {
final Exception exception = assertThrows(NullPointerException.class,
() -> new StreamsGroupHeartbeatRequestManager(
@@ -180,6 +207,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
config,
coordinatorRequestManager,
membershipManager,
+ backgroundEventHandler,
null,
streamsRebalanceData
));
@@ -194,103 +222,416 @@ class StreamsGroupHeartbeatRequestManagerTest {
config,
coordinatorRequestManager,
membershipManager,
+ backgroundEventHandler,
metrics,
null
));
assertEquals("Streams rebalance data cannot be null",
exception.getMessage());
}
+ @Test
+ public void testNoHeartbeatIfCoordinatorUnknown() {
+ try (final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ verify(membershipManager).onHeartbeatRequestSkipped();
+ verify(pollTimer, never()).update();
+ }
+ }
+
+ @Test
+ public void testNoHeartbeatIfHeartbeatSkipped() {
+ try (final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ verify(membershipManager).onHeartbeatRequestSkipped();
+ verify(pollTimer, never()).update();
+ }
+ }
+
+ @Test
+ public void testPropagateCoordinatorFatalErrorToApplicationThread() {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager =
createStreamsGroupHeartbeatRequestManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ final Throwable fatalError = new RuntimeException("KABOOM");
+
when(coordinatorRequestManager.getAndClearFatalError()).thenReturn(Optional.of(fatalError));
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ verify(membershipManager).onHeartbeatRequestSkipped();
+ verify(backgroundEventHandler).add(argThat(
+ errorEvent -> errorEvent instanceof ErrorEvent && ((ErrorEvent)
errorEvent).error() == fatalError));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testSendingHeartbeatIfMemberIsLeaving(final boolean
requestInFlight) {
+ final long heartbeatIntervalMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+ when(mock.requestInFlight()).thenReturn(requestInFlight);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = MemberState.class, names = {"JOINING",
"ACKNOWLEDGING"})
+ public void testSendingHeartbeatIfMemberIsJoiningOrAcknowledging(final
MemberState memberState) {
+ final long heartbeatIntervalMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.state()).thenReturn(memberState);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = MemberState.class, names = {"JOINING",
"ACKNOWLEDGING"})
+ public void
testNotSendingHeartbeatIfMemberIsJoiningOrAcknowledgingWhenHeartbeatInFlight(final
MemberState memberState) {
+ final long timeToNextHeartbeatMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
+ when(mock.requestInFlight()).thenReturn(true);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.state()).thenReturn(memberState);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ assertEquals(timeToNextHeartbeatMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @Test
+ public void testSendingHeartbeatIfHeartbeatCanBeSent() {
+ final long heartbeatIntervalMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @Test
+ public void testNotSendingHeartbeatIfHeartbeatCannotBeSent() {
+ final long timeToNextHeartbeatMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(Timer.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ assertEquals(timeToNextHeartbeatMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @Test
+ public void testSendingLeaveHeartbeatIfPollTimerExpired() {
+ final long heartbeatIntervalMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
+ Timer.class,
+ (mock, context) -> {
+ when(mock.isExpired()).thenReturn(true);
+ });
+ final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
+ StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final HeartbeatRequestState heartbeatRequestState =
heartbeatRequestStateMockedConstruction.constructed().get(0);
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(heartbeatIntervalMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ verify(membershipManager).onPollTimerExpired();
+ verify(heartbeatRequestState).reset();
+ verify(heartbeatState).reset();
+ }
+ }
+
+ @Test
+ public void
testNotSendingLeaveHeartbeatIfPollTimerExpiredAndMemberIsLeaving() {
+ final long timeToNextHeartbeatMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
+ Timer.class,
+ (mock, context) -> {
+ when(mock.isExpired()).thenReturn(true);
+ });
+ final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
+ StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final HeartbeatRequestState heartbeatRequestState =
heartbeatRequestStateMockedConstruction.constructed().get(0);
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+ final Timer pollTimer =
pollTimerMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.isLeavingGroup()).thenReturn(true);
+
when(membershipManager.state()).thenReturn(MemberState.PREPARE_LEAVING);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ assertEquals(timeToNextHeartbeatMs, result.timeUntilNextPollMs);
+ verify(pollTimer).update(time.milliseconds());
+ verify(membershipManager, never()).onPollTimerExpired();
+ verify(heartbeatRequestState, never()).reset();
+ verify(heartbeatState, never()).reset();
+ }
+ }
+
@Test
public void testSendingFullHeartbeatRequest() {
- final StreamsGroupHeartbeatRequestManager heartbeatRequestManager =
new StreamsGroupHeartbeatRequestManager(
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+ })
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final HeartbeatRequestState heartbeatRequestState =
heartbeatRequestStateMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+ when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.timeUntilNextPollMs);
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(Optional.of(coordinatorNode),
result.unsentRequests.get(0).node());
+ NetworkClientDelegate.UnsentRequest networkRequest =
result.unsentRequests.get(0);
+ StreamsGroupHeartbeatRequest streamsRequest =
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
+ assertEquals(GROUP_ID, streamsRequest.data().groupId());
+ assertEquals(MEMBER_ID, streamsRequest.data().memberId());
+ assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
+ assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
+ assertEquals(PROCESS_ID.toString(),
streamsRequest.data().processId());
+ assertEquals(ENDPOINT.host(),
streamsRequest.data().userEndpoint().host());
+ assertEquals(ENDPOINT.port(),
streamsRequest.data().userEndpoint().port());
+ assertEquals(1, streamsRequest.data().clientTags().size());
+ assertEquals(CLIENT_TAG_1,
streamsRequest.data().clientTags().get(0).key());
+ assertEquals(VALUE_1,
streamsRequest.data().clientTags().get(0).value());
+ assertEquals(streamsRebalanceData.topologyEpoch(),
streamsRequest.data().topology().epoch());
+ assertNotNull(streamsRequest.data().topology());
+ final List<StreamsGroupHeartbeatRequestData.Subtopology>
subtopologies = streamsRequest.data().topology().subtopologies();
+ assertEquals(1, subtopologies.size());
+ final StreamsGroupHeartbeatRequestData.Subtopology subtopology =
subtopologies.get(0);
+ assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId());
+ assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"),
subtopology.sourceTopics());
+ assertEquals(Arrays.asList("repartitionSinkTopic1",
"repartitionSinkTopic2", "repartitionSinkTopic3"),
subtopology.repartitionSinkTopics());
+ assertEquals(REPARTITION_SOURCE_TOPICS.size(),
subtopology.repartitionSourceTopics().size());
+ subtopology.repartitionSourceTopics().forEach(topicInfo -> {
+ final StreamsRebalanceData.TopicInfo repartitionTopic =
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+ assertEquals(repartitionTopic.numPartitions().get(),
topicInfo.partitions());
+ assertEquals(repartitionTopic.replicationFactor().get(),
topicInfo.replicationFactor());
+ assertEquals(repartitionTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
+ });
+ assertEquals(CHANGELOG_TOPICS.size(),
subtopology.stateChangelogTopics().size());
+ subtopology.stateChangelogTopics().forEach(topicInfo -> {
+ assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+ assertEquals(0, topicInfo.partitions());
+ final StreamsRebalanceData.TopicInfo changelogTopic =
CHANGELOG_TOPICS.get(topicInfo.name());
+ assertEquals(changelogTopic.replicationFactor().get(),
topicInfo.replicationFactor());
+ assertEquals(changelogTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
+ });
+ assertEquals(2, subtopology.copartitionGroups().size());
+ final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData1 =
+ new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+
.setRepartitionSourceTopics(Collections.singletonList((short) 0))
+ .setSourceTopics(Collections.singletonList((short) 1));
+ final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData2 =
+ new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+
.setRepartitionSourceTopics(Collections.singletonList((short) 1))
+ .setSourceTopics(Collections.singletonList((short) 0));
+
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
+
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
+ verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
+ verify(membershipManager).onHeartbeatRequestGenerated();
+ time.sleep(2000);
+ assertEquals(
+ 2.0,
+
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago",
"consumer-coordinator-metrics")).metricValue()
+ );
+ final ClientResponse response = buildClientResponse();
+ networkRequest.future().complete(response);
+
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse)
response.responseBody());
+
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+ verify(heartbeatRequestState).resetTimer();
+ final List<TopicPartition> topicPartitions =
streamsRebalanceData.partitionsByHost()
+ .get(new StreamsRebalanceData.HostInfo(
+ ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+ ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+ );
+
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(),
topicPartitions.get(0).topic());
+
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
topicPartitions.get(0).partition());
+ assertEquals(
+ 1.0,
+ metrics.metric(metrics.metricName("heartbeat-total",
"consumer-coordinator-metrics")).metricValue()
+ );
+ }
+ }
+
+ @Test
+ public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
+ try (
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
+ Timer.class,
+ (mock, context) -> {
+ when(mock.isExpired()).thenReturn(true);
+ });
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final HeartbeatRequestState heartbeatRequestState =
heartbeatRequestStateMockedConstruction.constructed().get(0);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_MEMBER_EPOCH);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.timeUntilNextPollMs);
+ assertEquals(1, result.unsentRequests.size());
+ assertEquals(Optional.of(coordinatorNode),
result.unsentRequests.get(0).node());
+ NetworkClientDelegate.UnsentRequest networkRequest =
result.unsentRequests.get(0);
+ StreamsGroupHeartbeatRequest streamsRequest =
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
+ assertEquals(GROUP_ID, streamsRequest.data().groupId());
+ assertEquals(MEMBER_ID, streamsRequest.data().memberId());
+ assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
streamsRequest.data().memberEpoch());
+ assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
+ verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
+ verify(membershipManager).onHeartbeatRequestGenerated();
+ final ClientResponse response = buildClientResponse();
+ networkRequest.future().complete(response);
+ verify(heartbeatRequestState,
never()).updateHeartbeatIntervalMs(anyLong());
+ verify(heartbeatRequestState,
never()).onSuccessfulAttempt(anyLong());
+ verify(membershipManager, never()).onHeartbeatSuccess(any());
+ }
+ }
+
+ private static ConsumerConfig config() {
+ Properties prop = new Properties();
+ prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
+ return new ConsumerConfig(prop);
+ }
+
+ private StreamsGroupHeartbeatRequestManager
createStreamsGroupHeartbeatRequestManager() {
+ return new StreamsGroupHeartbeatRequestManager(
LOG_CONTEXT,
time,
config,
coordinatorRequestManager,
membershipManager,
+ backgroundEventHandler,
metrics,
streamsRebalanceData
);
-
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
- when(membershipManager.groupId()).thenReturn(GROUP_ID);
- when(membershipManager.memberId()).thenReturn(MEMBER_ID);
- when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
-
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
-
- NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
-
- assertEquals(0, result.timeUntilNextPollMs);
- assertEquals(1, result.unsentRequests.size());
- assertEquals(Optional.of(coordinatorNode),
result.unsentRequests.get(0).node());
- NetworkClientDelegate.UnsentRequest networkRequest =
result.unsentRequests.get(0);
- StreamsGroupHeartbeatRequest streamsRequest =
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
- assertEquals(GROUP_ID, streamsRequest.data().groupId());
- assertEquals(MEMBER_ID, streamsRequest.data().memberId());
- assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
- assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
- assertEquals(PROCESS_ID.toString(), streamsRequest.data().processId());
- assertEquals(ENDPOINT.host(),
streamsRequest.data().userEndpoint().host());
- assertEquals(ENDPOINT.port(),
streamsRequest.data().userEndpoint().port());
- assertEquals(1, streamsRequest.data().clientTags().size());
- assertEquals("clientTag1",
streamsRequest.data().clientTags().get(0).key());
- assertEquals("value2",
streamsRequest.data().clientTags().get(0).value());
- assertEquals(streamsRebalanceData.topologyEpoch(),
streamsRequest.data().topology().epoch());
- assertNotNull(streamsRequest.data().topology());
- final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies
= streamsRequest.data().topology().subtopologies();
- assertEquals(1, subtopologies.size());
- final StreamsGroupHeartbeatRequestData.Subtopology subtopology =
subtopologies.get(0);
- assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId());
- assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"),
subtopology.sourceTopics());
- assertEquals(Arrays.asList("repartitionSinkTopic1",
"repartitionSinkTopic2", "repartitionSinkTopic3"),
subtopology.repartitionSinkTopics());
- assertEquals(REPARTITION_SOURCE_TOPICS.size(),
subtopology.repartitionSourceTopics().size());
- subtopology.repartitionSourceTopics().forEach(topicInfo -> {
- final StreamsRebalanceData.TopicInfo repartitionTopic =
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
- assertEquals(repartitionTopic.numPartitions().get(),
topicInfo.partitions());
- assertEquals(repartitionTopic.replicationFactor().get(),
topicInfo.replicationFactor());
- assertEquals(repartitionTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
- });
- assertEquals(CHANGELOG_TOPICS.size(),
subtopology.stateChangelogTopics().size());
- subtopology.stateChangelogTopics().forEach(topicInfo -> {
- assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
- assertEquals(0, topicInfo.partitions());
- final StreamsRebalanceData.TopicInfo changelogTopic =
CHANGELOG_TOPICS.get(topicInfo.name());
- assertEquals(changelogTopic.replicationFactor().get(),
topicInfo.replicationFactor());
- assertEquals(changelogTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
- });
- assertEquals(2, subtopology.copartitionGroups().size());
- final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData1 =
- new StreamsGroupHeartbeatRequestData.CopartitionGroup()
- .setRepartitionSourceTopics(Collections.singletonList((short)
0))
- .setSourceTopics(Collections.singletonList((short) 1));
- final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData2 =
- new StreamsGroupHeartbeatRequestData.CopartitionGroup()
- .setRepartitionSourceTopics(Collections.singletonList((short)
1))
- .setSourceTopics(Collections.singletonList((short) 0));
-
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
-
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
- verify(membershipManager).onHeartbeatRequestGenerated();
- time.sleep(2000);
- assertEquals(
- 2.0,
- metrics.metric(metrics.metricName("last-heartbeat-seconds-ago",
"consumer-coordinator-metrics")).metricValue()
- );
- final ClientResponse response = buildClientResponse();
- networkRequest.future().complete(response);
-
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse)
response.responseBody());
- final List<TopicPartition> topicPartitions =
streamsRebalanceData.partitionsByHost()
- .get(new StreamsRebalanceData.HostInfo(
- ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
- ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
- );
-
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(),
topicPartitions.get(0).topic());
-
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
topicPartitions.get(0).partition());
- assertEquals(
- 1.0,
- metrics.metric(metrics.metricName("heartbeat-total",
"consumer-coordinator-metrics")).metricValue()
- );
- NetworkClientDelegate.PollResult nextResult =
heartbeatRequestManager.poll(time.milliseconds());
- assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS,
nextResult.timeUntilNextPollMs);
}
private ClientResponse buildClientResponse() {
@@ -306,7 +647,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
new StreamsGroupHeartbeatResponse(
new StreamsGroupHeartbeatResponseData()
.setPartitionsByUserEndpoint(ENDPOINT_TO_PARTITIONS)
- .setHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS)
+ .setHeartbeatIntervalMs((int)
RECEIVED_HEARTBEAT_INTERVAL_MS)
)
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index b9c28406706..5845da7bede 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -1929,70 +1929,70 @@ public class StreamsMembershipManagerTest {
private static void verifyInStateReconciling(final
StreamsMembershipManager membershipManager) {
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertFalse(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateAcknowledging(final
StreamsMembershipManager membershipManager) {
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
- assertTrue(membershipManager.shouldHeartbeatNow());
+ assertTrue(membershipManager.shouldNotWaitForHeartbeatInterval());
assertFalse(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateLeaving(final StreamsMembershipManager
membershipManager) {
assertEquals(MemberState.LEAVING, membershipManager.state());
- assertTrue(membershipManager.shouldHeartbeatNow());
+ assertTrue(membershipManager.shouldNotWaitForHeartbeatInterval());
assertFalse(membershipManager.shouldSkipHeartbeat());
assertTrue(membershipManager.isLeavingGroup());
}
private static void verifyInStatePrepareLeaving(final
StreamsMembershipManager membershipManager) {
assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertFalse(membershipManager.shouldSkipHeartbeat());
assertTrue(membershipManager.isLeavingGroup());
}
private static void verifyInStateUnsubscribed(final
StreamsMembershipManager membershipManager) {
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertTrue(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateJoining(final StreamsMembershipManager
membershipManager) {
assertEquals(MemberState.JOINING, membershipManager.state());
- assertTrue(membershipManager.shouldHeartbeatNow());
+ assertTrue(membershipManager.shouldNotWaitForHeartbeatInterval());
assertFalse(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateStable(final StreamsMembershipManager
membershipManager) {
assertEquals(MemberState.STABLE, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertFalse(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateFenced(final StreamsMembershipManager
membershipManager) {
assertEquals(MemberState.FENCED, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertTrue(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateFatal(final StreamsMembershipManager
membershipManager) {
assertEquals(MemberState.FATAL, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertTrue(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}
private static void verifyInStateStale(final StreamsMembershipManager
membershipManager) {
assertEquals(MemberState.STALE, membershipManager.state());
- assertFalse(membershipManager.shouldHeartbeatNow());
+ assertFalse(membershipManager.shouldNotWaitForHeartbeatInterval());
assertTrue(membershipManager.shouldSkipHeartbeat());
assertFalse(membershipManager.isLeavingGroup());
}