This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53c41aca7ba KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer
(#15339)
53c41aca7ba is described below
commit 53c41aca7ba9469a0145023112f5fad254da4fa8
Author: Philip Nee <[email protected]>
AuthorDate: Wed Feb 28 01:52:01 2024 -0800
KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (#15339)
Adding the following rebalance metrics to the consumer:
rebalance-latency-avg
rebalance-latency-max
rebalance-latency-total
rebalance-rate-per-hour
rebalance-total
failed-rebalance-rate-per-hour
failed-rebalance-total
Due to the difference in protocol, we need to redefine when rebalance
starts and ends.
Start of Rebalance:
Current: Right before sending out JoinGroup
ConsumerGroup: When the client receives assignments from the HB
End of Rebalance - Successful Case:
Current: Receiving SyncGroup request after transitioning to
"COMPLETING_REBALANCE"
ConsumerGroup: After completing reconciliation and right before sending out
"Ack" heartbeat
End of Rebalance - Failed Case:
Current: Any failure in the JoinGroup/SyncGroup response
ConsumerGroup: Failure in the heartbeat
Note: Afterall, we try to be consistent with the current protocol.
Rebalances start and end with sending and receiving network requests. Failures
in network requests signify the user failures in rebalance. And it is entirely
possible to have multiple failures before having a successful one.
Reviewers: Lucas Brutschy <[email protected]>
---
.../internals/HeartbeatRequestManager.java | 3 +-
.../consumer/internals/MembershipManager.java | 7 +-
.../consumer/internals/MembershipManagerImpl.java | 62 +++++-
.../consumer/internals/RequestManagers.java | 3 +-
.../internals/metrics/RebalanceMetricsManager.java | 114 ++++++++++
.../consumer/internals/ConsumerTestBuilder.java | 26 +--
.../internals/HeartbeatRequestManagerTest.java | 8 +-
.../internals/MembershipManagerImplTest.java | 240 ++++++++++++++++++---
8 files changed, 408 insertions(+), 55 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 550e5b92ebf..d551dbe2508 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -327,7 +327,7 @@ public class HeartbeatRequestManager implements
RequestManager {
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatRequestState.resetTimer();
- membershipManager.onHeartbeatResponseReceived(response.data());
+ membershipManager.onHeartbeatSuccess(response.data());
maybeSendGroupMetadataUpdateEvent();
return;
}
@@ -357,6 +357,7 @@ public class HeartbeatRequestManager implements
RequestManager {
this.heartbeatState.reset();
this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
+ membershipManager.onHeartbeatFailure();
switch (error) {
case NOT_COORDINATOR:
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index a9c23d7b4d5..f0a641d140f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -68,7 +68,12 @@ public interface MembershipManager extends RequestManager {
*
* @param response Heartbeat response to extract member info and errors
from.
*/
- void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData
response);
+ void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response);
+
+ /**
+ * Notify the member that an error heartbeat response was received.
+ */
+ void onHeartbeatFailure();
/**
* Update state when a heartbeat is sent out. This will transition out of
the states that end
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index a9b0f3b94d8..6f3947eea46 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -27,11 +27,13 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundE
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
@@ -263,6 +265,11 @@ public class MembershipManagerImpl implements
MembershipManager {
* when the timer is reset, only when it completes releasing its
assignment.
*/
private CompletableFuture<Void> staleMemberAssignmentRelease;
+
+ /*
+ * Measures successful rebalance latency and number of failed rebalances.
+ */
+ private final RebalanceMetricsManager metricsManager;
private final Time time;
@@ -284,7 +291,35 @@ public class MembershipManagerImpl implements
MembershipManager {
LogContext logContext,
Optional<ClientTelemetryReporter>
clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler,
- Time time) {
+ Time time,
+ Metrics metrics) {
+ this(groupId,
+ groupInstanceId,
+ rebalanceTimeoutMs,
+ serverAssignor,
+ subscriptions,
+ commitRequestManager,
+ metadata,
+ logContext,
+ clientTelemetryReporter,
+ backgroundEventHandler,
+ time,
+ new RebalanceMetricsManager(metrics));
+ }
+
+ // Visible for testing
+ MembershipManagerImpl(String groupId,
+ Optional<String> groupInstanceId,
+ int rebalanceTimeoutMs,
+ Optional<String> serverAssignor,
+ SubscriptionState subscriptions,
+ CommitRequestManager commitRequestManager,
+ ConsumerMetadata metadata,
+ LogContext logContext,
+ Optional<ClientTelemetryReporter>
clientTelemetryReporter,
+ BackgroundEventHandler backgroundEventHandler,
+ Time time,
+ RebalanceMetricsManager metricsManager) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.serverAssignor = serverAssignor;
@@ -301,6 +336,7 @@ public class MembershipManagerImpl implements
MembershipManager {
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.backgroundEventHandler = backgroundEventHandler;
this.time = time;
+ this.metricsManager = metricsManager;
}
/**
@@ -314,10 +350,27 @@ public class MembershipManagerImpl implements
MembershipManager {
throw new IllegalStateException(String.format("Invalid state
transition from %s to %s",
state, nextState));
}
+
+ if (isCompletingRebalance(state, nextState)) {
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ }
+ if (isStartingRebalance(state, nextState)) {
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ }
+
log.trace("Member {} with epoch {} transitioned from {} to {}.",
memberId, memberEpoch, state, nextState);
this.state = nextState;
}
+ private static boolean isCompletingRebalance(MemberState currentState,
MemberState nextState) {
+ return currentState == MemberState.RECONCILING &&
+ (nextState == MemberState.STABLE || nextState ==
MemberState.ACKNOWLEDGING);
+ }
+
+ private static boolean isStartingRebalance(MemberState currentState,
MemberState nextState) {
+ return currentState != MemberState.RECONCILING && nextState ==
MemberState.RECONCILING;
+ }
+
/**
* {@inheritDoc}
*/
@@ -354,7 +407,7 @@ public class MembershipManagerImpl implements
MembershipManager {
* {@inheritDoc}
*/
@Override
- public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData
response) {
+ public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData
response) {
if (response.errorCode() != Errors.NONE.code()) {
String errorMessage = String.format(
"Unexpected error in Heartbeat response. Expected no
error, but received: %s",
@@ -403,6 +456,11 @@ public class MembershipManagerImpl implements
MembershipManager {
}
}
+ @Override
+ public void onHeartbeatFailure() {
+ metricsManager.maybeRecordRebalanceFailed();
+ }
+
/**
* @return True if the consumer is not a member of the group.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 2d90a3ad708..0b4c043d4a4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -189,7 +189,8 @@ public class RequestManagers implements Closeable {
logContext,
clientTelemetryReporter,
backgroundEventHandler,
- time);
+ time,
+ metrics);
membershipManager.registerStateListener(commit);
heartbeatRequestManager = new HeartbeatRequestManager(
logContext,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
new file mode 100644
index 00000000000..a255487f37a
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
+
+public class RebalanceMetricsManager {
+ private final Sensor successfulRebalanceSensor;
+ private final Sensor failedRebalanceSensor;
+ private final String metricGroupName;
+
+ public final MetricName rebalanceLatencyAvg;
+ public final MetricName rebalanceLatencyMax;
+ public final MetricName rebalanceLatencyTotal;
+ public final MetricName rebalanceTotal;
+ public final MetricName rebalanceRatePerHour;
+ public final MetricName lastRebalanceSecondsAgo;
+ public final MetricName failedRebalanceTotal;
+ public final MetricName failedRebalanceRate;
+ private long lastRebalanceEndMs = -1L;
+ private long lastRebalanceStartMs = -1L;
+
+ public RebalanceMetricsManager(Metrics metrics) {
+ metricGroupName = CONSUMER_METRIC_GROUP_PREFIX +
COORDINATOR_METRICS_SUFFIX;
+
+ rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg",
+ "The average time taken for a group to complete a rebalance");
+ rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max",
+ "The max time taken for a group to complete a rebalance");
+ rebalanceLatencyTotal = createMetric(metrics,
"rebalance-latency-total",
+ "The total number of milliseconds spent in rebalances");
+ rebalanceTotal = createMetric(metrics, "rebalance-total",
+ "The total number of rebalance events");
+ rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour",
+ "The number of rebalance events per hour");
+ failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total",
+ "The total number of failed rebalance events");
+ failedRebalanceRate = createMetric(metrics,
"failed-rebalance-rate-per-hour",
+ "The number of failed rebalance events per hour");
+
+ successfulRebalanceSensor = metrics.sensor("rebalance-latency");
+ successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg());
+ successfulRebalanceSensor.add(rebalanceLatencyMax, new Max());
+ successfulRebalanceSensor.add(rebalanceLatencyTotal, new
CumulativeSum());
+ successfulRebalanceSensor.add(rebalanceTotal, new CumulativeCount());
+ successfulRebalanceSensor.add(rebalanceRatePerHour, new
Rate(TimeUnit.HOURS, new WindowedCount()));
+
+ failedRebalanceSensor = metrics.sensor("failed-rebalance");
+ failedRebalanceSensor.add(failedRebalanceTotal, new CumulativeSum());
+ failedRebalanceSensor.add(failedRebalanceRate, new
Rate(TimeUnit.HOURS, new WindowedCount()));
+
+ Measurable lastRebalance = (config, now) -> {
+ if (lastRebalanceEndMs == -1L)
+ return -1d;
+ else
+ return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs,
TimeUnit.MILLISECONDS);
+ };
+ lastRebalanceSecondsAgo = createMetric(metrics,
+ "last-rebalance-seconds-ago",
+ "The number of seconds since the last rebalance event");
+ metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance);
+ }
+
+ private MetricName createMetric(Metrics metrics, String name, String
description) {
+ return metrics.metricName(name, metricGroupName, description);
+ }
+
+ public void recordRebalanceStarted(long nowMs) {
+ lastRebalanceStartMs = nowMs;
+ }
+
+ public void recordRebalanceEnded(long nowMs) {
+ lastRebalanceEndMs = nowMs;
+ successfulRebalanceSensor.record(nowMs - lastRebalanceStartMs);
+ }
+
+ public void maybeRecordRebalanceFailed() {
+ if (lastRebalanceStartMs <= lastRebalanceEndMs)
+ return;
+ failedRebalanceSensor.record();
+ }
+
+ public boolean rebalanceStarted() {
+ return lastRebalanceStartMs > lastRebalanceEndMs;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index d6ae6295060..d62d3d8a35e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
+import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -196,18 +197,19 @@ public class ConsumerTestBuilder implements Closeable {
gi.groupInstanceId,
metrics));
MembershipManager mm = spy(
- new MembershipManagerImpl(
- gi.groupId,
- gi.groupInstanceId,
- groupRebalanceConfig.rebalanceTimeoutMs,
- gi.serverAssignor,
- subscriptions,
- commit,
- metadata,
- logContext,
- Optional.empty(),
- backgroundEventHandler,
- time
+ new MembershipManagerImpl(
+ gi.groupId,
+ gi.groupInstanceId,
+ groupRebalanceConfig.rebalanceTimeoutMs,
+ gi.serverAssignor,
+ subscriptions,
+ commit,
+ metadata,
+ logContext,
+ Optional.empty(),
+ backgroundEventHandler,
+ time,
+ mock(RebalanceMetricsManager.class)
)
);
HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new
HeartbeatRequestManager.HeartbeatState(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 5cf5b9e2d92..4d4492bcb47 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -306,7 +306,7 @@ public class HeartbeatRequestManagerTest {
new ConsumerGroupHeartbeatResponse(new
ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch));
- membershipManager.onHeartbeatResponseReceived(result.data());
+ membershipManager.onHeartbeatSuccess(result.data());
// Create a ConsumerHeartbeatRequest and verify the payload
NetworkClientDelegate.PollResult pollResult =
heartbeatRequestManager.poll(time.milliseconds());
@@ -441,7 +441,7 @@ public class HeartbeatRequestManagerTest {
switch (error) {
case NONE:
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
- verify(membershipManager,
times(2)).onHeartbeatResponseReceived(mockResponse.data());
+ verify(membershipManager,
times(2)).onHeartbeatSuccess(mockResponse.data());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
break;
@@ -547,7 +547,7 @@ public class HeartbeatRequestManagerTest {
.setMemberEpoch(1)
.setAssignment(assignmentTopic1));
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
"topic1"));
- membershipManager.onHeartbeatResponseReceived(rs1.data());
+ membershipManager.onHeartbeatSuccess(rs1.data());
// We remain in RECONCILING state, as the assignment will be
reconciled on the next poll
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -712,7 +712,7 @@ public class HeartbeatRequestManagerTest {
.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch));
- membershipManager.onHeartbeatResponseReceived(rs1.data());
+ membershipManager.onHeartbeatSuccess(rs1.data());
assertEquals(MemberState.STABLE, membershipManager.state());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 3294068b074..8a0fcf85758 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -16,12 +16,15 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
+import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -102,6 +105,8 @@ public class MembershipManagerImplTest {
private BlockingQueue<BackgroundEvent> backgroundEventQueue;
private BackgroundEventHandler backgroundEventHandler;
private Time time;
+ private Metrics metrics;
+ private RebalanceMetricsManager rebalanceMetricsManager;
@BeforeEach
public void setup() {
@@ -111,7 +116,9 @@ public class MembershipManagerImplTest {
commitRequestManager =
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
backgroundEventQueue = testBuilder.backgroundEventQueue;
backgroundEventHandler = testBuilder.backgroundEventHandler;
- time = testBuilder.time;
+ time = new MockTime(0);
+ metrics = new Metrics(time);
+ rebalanceMetricsManager = new RebalanceMetricsManager(metrics);
}
@AfterEach
@@ -135,15 +142,16 @@ public class MembershipManagerImplTest {
return spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext,
Optional.empty(),
- backgroundEventHandler, time));
+ backgroundEventHandler, time, rebalanceMetricsManager));
}
private MembershipManagerImpl createMembershipManagerJoiningGroup(String
groupInstanceId,
String
serverAssignor) {
- MembershipManagerImpl manager = new MembershipManagerImpl(
+ MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId),
REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState,
commitRequestManager,
- metadata, logContext, Optional.empty(),
backgroundEventHandler, time);
+ metadata, logContext, Optional.empty(),
backgroundEventHandler, time,
+ rebalanceMetricsManager));
manager.transitionToJoining();
return manager;
}
@@ -160,7 +168,6 @@ public class MembershipManagerImplTest {
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
createMembershipManagerJoiningGroup();
- createMembershipManagerJoiningGroup(null, null);
}
@Test
@@ -169,7 +176,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT,
Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext,
Optional.empty(),
- backgroundEventHandler, time);
+ backgroundEventHandler, time, rebalanceMetricsManager);
manager.transitionToJoining();
clearInvocations(metadata);
@@ -200,12 +207,12 @@ public class MembershipManagerImplTest {
ConsumerGroupHeartbeatResponse responseWithoutAssignment =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data());
+ membershipManager.onHeartbeatSuccess(responseWithoutAssignment.data());
assertNotEquals(MemberState.RECONCILING, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithAssignment =
createConsumerGroupHeartbeatResponse(createAssignment(true));
-
membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data());
+ membershipManager.onHeartbeatSuccess(responseWithAssignment.data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
@@ -213,7 +220,7 @@ public class MembershipManagerImplTest {
public void testMemberIdAndEpochResetOnFencedMembers() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -230,7 +237,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -246,7 +253,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT,
Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext,
Optional.empty(),
- backgroundEventHandler, time);
+ backgroundEventHandler, time, rebalanceMetricsManager);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
@@ -299,7 +306,7 @@ public class MembershipManagerImplTest {
membershipManager.registerStateListener(listener);
int epoch = 5;
- membershipManager.onHeartbeatResponseReceived(new
ConsumerGroupHeartbeatResponseData()
+ membershipManager.onHeartbeatSuccess(new
ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberEpoch(epoch));
@@ -307,7 +314,7 @@ public class MembershipManagerImplTest {
verify(listener).onMemberEpochUpdated(Optional.of(epoch),
Optional.of(MEMBER_ID));
clearInvocations(listener);
- membershipManager.onHeartbeatResponseReceived(new
ConsumerGroupHeartbeatResponseData()
+ membershipManager.onHeartbeatSuccess(new
ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberEpoch(epoch));
@@ -316,7 +323,7 @@ public class MembershipManagerImplTest {
private void mockStableMember(MembershipManagerImpl membershipManager) {
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -709,7 +716,7 @@ public class MembershipManagerImplTest {
CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
-
membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
+
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
assertEquals(MemberState.LEAVING, membershipManager.state());
assertEquals(-1, membershipManager.memberEpoch());
@@ -726,7 +733,7 @@ public class MembershipManagerImplTest {
when(membershipManager.state()).thenReturn(state);
ConsumerGroupHeartbeatResponseData responseData =
mock(ConsumerGroupHeartbeatResponseData.class);
- membershipManager.onHeartbeatResponseReceived(responseData);
+ membershipManager.onHeartbeatSuccess(responseData);
assertEquals(state, membershipManager.state());
verify(responseData, never()).memberId();
@@ -861,7 +868,7 @@ public class MembershipManagerImplTest {
public void testFatalFailureWhenStateIsStable() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
testStateUpdateOnFatalFailure(membershipManager);
@@ -930,9 +937,9 @@ public class MembershipManagerImplTest {
// Updating state with a heartbeat response containing errors cannot
be performed and
// should fail.
ConsumerGroupHeartbeatResponse unknownMemberResponse =
- createConsumerGroupHeartbeatResponseWithError();
+
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
assertThrows(IllegalArgumentException.class,
- () ->
membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data()));
+ () ->
membershipManager.onHeartbeatSuccess(unknownMemberResponse.data()));
}
/**
@@ -1099,7 +1106,7 @@ public class MembershipManagerImplTest {
// Target assignment received again with the same unresolved topic.
Client should keep it
// as unresolved.
clearInvocations(subscriptionState);
-
membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(assignment).data());
+
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment).data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(Collections.singleton(topic2),
membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState,
never()).assignFromSubscribed(anyCollection());
@@ -1173,6 +1180,9 @@ public class MembershipManagerImplTest {
verify(subscriptionState,
never()).assignFromSubscribed(anyCollection());
assertEquals(MemberState.STABLE, membershipManager.state());
+
+ assertEquals(1.0d, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceTotal));
+ assertEquals(0.0d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceTotal));
}
@Test
@@ -1376,7 +1386,6 @@ public class MembershipManagerImplTest {
@Test
public void testListenerCallbacksBasic() {
- // Step 1: set up mocks
MembershipManagerImpl membershipManager = createMemberInStableState();
CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
@@ -1685,7 +1694,7 @@ public class MembershipManagerImplTest {
public void testTransitionToLeavingWhileStableDueToStaleMember() {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
doNothing().when(subscriptionState).assignFromSubscribed(any());
assertEquals(MemberState.STABLE, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
@@ -1832,8 +1841,8 @@ public class MembershipManagerImplTest {
return new ConsumerRebalanceListenerInvoker(
new LogContext(),
subscriptionState,
- new MockTime(1),
- new RebalanceCallbackMetricsManager(new Metrics())
+ time,
+ new RebalanceCallbackMetricsManager(new Metrics(time))
);
}
@@ -1930,7 +1939,7 @@ public class MembershipManagerImplTest {
@Test
public void
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
- MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup(null);
+ MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
assertEquals(MemberState.JOINING, membershipManager.state());
receiveEmptyAssignment(membershipManager);
@@ -1940,6 +1949,144 @@ public class MembershipManagerImplTest {
assertEquals(MemberState.STABLE, membershipManager.state());
}
+ @Test
+ public void testMetricsWhenHeartbeatFailed() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ membershipManager.onHeartbeatFailure();
+
+ // Not expecting rebalance failures without assignments being
reconciled
+ assertEquals(0.0d, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceTotal));
+ assertEquals(0.0d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceTotal));
+ }
+
+ @Test
+ public void testRebalanceMetricsOnSuccessfulRebalance() {
+ MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
+ mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
+
+ CompletableFuture<Void> commitResult = mockRevocationNoCallbacks(true);
+
+ receiveEmptyAssignment(membershipManager);
+ long reconciliationDurationMs = 1234;
+ time.sleep(reconciliationDurationMs);
+
+ membershipManager.poll(time.milliseconds());
+ // Complete commit request to complete the callback invocation
+ commitResult.complete(null);
+
+ assertEquals((double) reconciliationDurationMs,
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal));
+ assertEquals((double) reconciliationDurationMs,
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyAvg));
+ assertEquals((double) reconciliationDurationMs,
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyMax));
+ assertEquals(1d, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceTotal));
+ assertEquals(120d, 1d, (double) getMetricValue(metrics,
rebalanceMetricsManager.rebalanceRatePerHour));
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceRate));
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceTotal));
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.lastRebalanceSecondsAgo));
+ }
+
+ @Test
+ public void testRebalanceMetricsForMultipleReconcilations() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+ SleepyRebalanceListener listener = new SleepyRebalanceListener(1453,
time);
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+
+ membershipManager.poll(time.milliseconds());
+
+ // assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1),
+ true
+ );
+
+ long firstRebalanaceTimesMs = listener.sleepMs;
+ listener.reset();
+
+ // ack
+ membershipManager.onHeartbeatRequestSent();
+
+ // revoke all
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
+ receiveAssignment(topicId, Collections.singletonList(2),
membershipManager);
+
+ membershipManager.poll(time.milliseconds());
+
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1),
+ true
+ );
+
+ // assign new partition 2
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 2),
+ true
+ );
+ membershipManager.onHeartbeatRequestSent();
+
+ long secondRebalanceMs = listener.sleepMs;
+ long total = firstRebalanaceTimesMs + secondRebalanceMs;
+ double avg = total / 2.0d;
+ long max = Math.max(firstRebalanaceTimesMs, secondRebalanceMs);
+ assertEquals((double) total, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceLatencyTotal));
+ assertEquals(avg, (double) getMetricValue(metrics,
rebalanceMetricsManager.rebalanceLatencyAvg), 1d);
+ assertEquals((double) max, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceLatencyMax));
+ assertEquals(2d, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceTotal));
+ // rate is not tested because it is subject to Rate implementation
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceRate));
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceTotal));
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.lastRebalanceSecondsAgo));
+
+ }
+
+ @Test
+ public void testRebalanceMetricsOnFailedRebalance() {
+ MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
+
+ Uuid topicId = Uuid.randomUuid();
+
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+
+ // sleep for an arbitrary amount
+ time.sleep(2300);
+
+ assertTrue(rebalanceMetricsManager.rebalanceStarted());
+ membershipManager.onHeartbeatFailure();
+
+ assertEquals((double) 0, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceLatencyTotal));
+ assertEquals(0d, getMetricValue(metrics,
rebalanceMetricsManager.rebalanceTotal));
+ assertEquals(120d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceRate));
+ assertEquals(1d, getMetricValue(metrics,
rebalanceMetricsManager.failedRebalanceTotal));
+ assertEquals(-1d, getMetricValue(metrics,
rebalanceMetricsManager.lastRebalanceSecondsAgo));
+ }
+
+ private Object getMetricValue(Metrics metrics, MetricName name) {
+ return metrics.metrics().get(name).metricValue();
+ }
+
private MembershipManagerImpl
mockMemberSuccessfullyReceivesAndAcksAssignment(
Uuid topicId, String topicName, List<Integer> partitions) {
MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup();
@@ -2119,7 +2266,7 @@ public class MembershipManagerImplTest {
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
membershipManager.poll(time.milliseconds());
if (expectSubscriptionUpdated) {
@@ -2136,9 +2283,9 @@ public class MembershipManagerImplTest {
}
private MembershipManagerImpl createMemberInStableState(String
groupInstanceId) {
- MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup(groupInstanceId);
+ MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup(groupInstanceId, null);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
return membershipManager;
}
@@ -2150,7 +2297,7 @@ public class MembershipManagerImplTest {
.setTopicId(tp.getKey())
.setPartitions(new
ArrayList<>(tp.getValue()))).collect(Collectors.toList()));
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(targetAssignment);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
}
private void receiveAssignment(Uuid topicId, List<Integer> partitions,
MembershipManager membershipManager) {
@@ -2160,7 +2307,7 @@ public class MembershipManagerImplTest {
.setTopicId(topicId)
.setPartitions(partitions)));
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(targetAssignment);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
}
private void receiveAssignmentAfterRejoin(Uuid topicId, List<Integer>
partitions, MembershipManager membershipManager) {
@@ -2171,7 +2318,7 @@ public class MembershipManagerImplTest {
.setPartitions(partitions)));
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponseWithBumpedEpoch(targetAssignment);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
}
private void receiveEmptyAssignment(MembershipManager membershipManager) {
@@ -2179,7 +2326,7 @@ public class MembershipManagerImplTest {
ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.emptyList());
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(targetAssignment);
-
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+ membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
}
/**
@@ -2307,9 +2454,9 @@ public class MembershipManagerImplTest {
.setAssignment(assignment));
}
- private ConsumerGroupHeartbeatResponse
createConsumerGroupHeartbeatResponseWithError() {
+ private ConsumerGroupHeartbeatResponse
createConsumerGroupHeartbeatResponseWithError(Errors error) {
return new ConsumerGroupHeartbeatResponse(new
ConsumerGroupHeartbeatResponseData()
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ .setErrorCode(error.code())
.setMemberId(MEMBER_ID)
.setMemberEpoch(5));
}
@@ -2356,4 +2503,29 @@ public class MembershipManagerImplTest {
Arguments.of(MemberState.STALE));
}
+ private static class SleepyRebalanceListener implements
ConsumerRebalanceListener {
+ private long sleepMs;
+ private final long sleepDurationMs;
+ private final Time time;
+ SleepyRebalanceListener(long sleepDurationMs, Time time) {
+ this.sleepDurationMs = sleepDurationMs;
+ this.time = time;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ sleepMs += sleepDurationMs;
+ time.sleep(sleepDurationMs);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ sleepMs += sleepDurationMs;
+ time.sleep(sleepDurationMs);
+ }
+
+ public void reset() {
+ sleepMs = 0;
+ }
+ }
}