This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 53c41aca7ba KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (#15339) 53c41aca7ba is described below commit 53c41aca7ba9469a0145023112f5fad254da4fa8 Author: Philip Nee <p...@confluent.io> AuthorDate: Wed Feb 28 01:52:01 2024 -0800 KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (#15339) Adding the following rebalance metrics to the consumer: rebalance-latency-avg rebalance-latency-max rebalance-latency-total rebalance-rate-per-hour rebalance-total failed-rebalance-rate-per-hour failed-rebalance-total Due to the difference in protocol, we need to redefine when rebalance starts and ends. Start of Rebalance: Current: Right before sending out JoinGroup ConsumerGroup: When the client receives assignments from the HB End of Rebalance - Successful Case: Current: Receiving SyncGroup request after transitioning to "COMPLETING_REBALANCE" ConsumerGroup: After completing reconciliation and right before sending out "Ack" heartbeat End of Rebalance - Failed Case: Current: Any failure in the JoinGroup/SyncGroup response ConsumerGroup: Failure in the heartbeat Note: Afterall, we try to be consistent with the current protocol. Rebalances start and end with sending and receiving network requests. Failures in network requests signify the user failures in rebalance. And it is entirely possible to have multiple failures before having a successful one. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../internals/HeartbeatRequestManager.java | 3 +- .../consumer/internals/MembershipManager.java | 7 +- .../consumer/internals/MembershipManagerImpl.java | 62 +++++- .../consumer/internals/RequestManagers.java | 3 +- .../internals/metrics/RebalanceMetricsManager.java | 114 ++++++++++ .../consumer/internals/ConsumerTestBuilder.java | 26 +-- .../internals/HeartbeatRequestManagerTest.java | 8 +- .../internals/MembershipManagerImplTest.java | 240 ++++++++++++++++++--- 8 files changed, 408 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 550e5b92ebf..d551dbe2508 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -327,7 +327,7 @@ public class HeartbeatRequestManager implements RequestManager { heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs()); heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); heartbeatRequestState.resetTimer(); - membershipManager.onHeartbeatResponseReceived(response.data()); + membershipManager.onHeartbeatSuccess(response.data()); maybeSendGroupMetadataUpdateEvent(); return; } @@ -357,6 +357,7 @@ public class HeartbeatRequestManager implements RequestManager { this.heartbeatState.reset(); this.heartbeatRequestState.onFailedAttempt(currentTimeMs); + membershipManager.onHeartbeatFailure(); switch (error) { case NOT_COORDINATOR: diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index a9c23d7b4d5..f0a641d140f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -68,7 +68,12 @@ public interface MembershipManager extends RequestManager { * * @param response Heartbeat response to extract member info and errors from. */ - void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData response); + void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response); + + /** + * Notify the member that an error heartbeat response was received. + */ + void onHeartbeatFailure(); /** * Update state when a heartbeat is sent out. This will transition out of the states that end diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index a9b0f3b94d8..6f3947eea46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -27,11 +27,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundE import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider; @@ -263,6 +265,11 @@ public class MembershipManagerImpl implements MembershipManager { * when the timer is reset, only when it completes releasing its assignment. */ private CompletableFuture<Void> staleMemberAssignmentRelease; + + /* + * Measures successful rebalance latency and number of failed rebalances. + */ + private final RebalanceMetricsManager metricsManager; private final Time time; @@ -284,7 +291,35 @@ public class MembershipManagerImpl implements MembershipManager { LogContext logContext, Optional<ClientTelemetryReporter> clientTelemetryReporter, BackgroundEventHandler backgroundEventHandler, - Time time) { + Time time, + Metrics metrics) { + this(groupId, + groupInstanceId, + rebalanceTimeoutMs, + serverAssignor, + subscriptions, + commitRequestManager, + metadata, + logContext, + clientTelemetryReporter, + backgroundEventHandler, + time, + new RebalanceMetricsManager(metrics)); + } + + // Visible for testing + MembershipManagerImpl(String groupId, + Optional<String> groupInstanceId, + int rebalanceTimeoutMs, + Optional<String> serverAssignor, + SubscriptionState subscriptions, + CommitRequestManager commitRequestManager, + ConsumerMetadata metadata, + LogContext logContext, + Optional<ClientTelemetryReporter> clientTelemetryReporter, + BackgroundEventHandler backgroundEventHandler, + Time time, + RebalanceMetricsManager metricsManager) { this.groupId = groupId; this.state = MemberState.UNSUBSCRIBED; this.serverAssignor = serverAssignor; @@ -301,6 +336,7 @@ public class MembershipManagerImpl implements MembershipManager { this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.backgroundEventHandler = backgroundEventHandler; this.time = time; + this.metricsManager = metricsManager; } /** @@ -314,10 +350,27 @@ public class MembershipManagerImpl implements MembershipManager { throw new IllegalStateException(String.format("Invalid state transition from %s to %s", state, nextState)); } + + if (isCompletingRebalance(state, nextState)) { + metricsManager.recordRebalanceEnded(time.milliseconds()); + } + if (isStartingRebalance(state, nextState)) { + metricsManager.recordRebalanceStarted(time.milliseconds()); + } + log.trace("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState); this.state = nextState; } + private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) { + return currentState == MemberState.RECONCILING && + (nextState == MemberState.STABLE || nextState == MemberState.ACKNOWLEDGING); + } + + private static boolean isStartingRebalance(MemberState currentState, MemberState nextState) { + return currentState != MemberState.RECONCILING && nextState == MemberState.RECONCILING; + } + /** * {@inheritDoc} */ @@ -354,7 +407,7 @@ public class MembershipManagerImpl implements MembershipManager { * {@inheritDoc} */ @Override - public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData response) { + public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) { if (response.errorCode() != Errors.NONE.code()) { String errorMessage = String.format( "Unexpected error in Heartbeat response. Expected no error, but received: %s", @@ -403,6 +456,11 @@ public class MembershipManagerImpl implements MembershipManager { } } + @Override + public void onHeartbeatFailure() { + metricsManager.maybeRecordRebalanceFailed(); + } + /** * @return True if the consumer is not a member of the group. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 2d90a3ad708..0b4c043d4a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -189,7 +189,8 @@ public class RequestManagers implements Closeable { logContext, clientTelemetryReporter, backgroundEventHandler, - time); + time, + metrics); membershipManager.registerStateListener(commit); heartbeatRequestManager = new HeartbeatRequestManager( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java new file mode 100644 index 00000000000..a255487f37a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; + +public class RebalanceMetricsManager { + private final Sensor successfulRebalanceSensor; + private final Sensor failedRebalanceSensor; + private final String metricGroupName; + + public final MetricName rebalanceLatencyAvg; + public final MetricName rebalanceLatencyMax; + public final MetricName rebalanceLatencyTotal; + public final MetricName rebalanceTotal; + public final MetricName rebalanceRatePerHour; + public final MetricName lastRebalanceSecondsAgo; + public final MetricName failedRebalanceTotal; + public final MetricName failedRebalanceRate; + private long lastRebalanceEndMs = -1L; + private long lastRebalanceStartMs = -1L; + + public RebalanceMetricsManager(Metrics metrics) { + metricGroupName = CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX; + + rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg", + "The average time taken for a group to complete a rebalance"); + rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max", + "The max time taken for a group to complete a rebalance"); + rebalanceLatencyTotal = createMetric(metrics, "rebalance-latency-total", + "The total number of milliseconds spent in rebalances"); + rebalanceTotal = createMetric(metrics, "rebalance-total", + "The total number of rebalance events"); + rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour", + "The number of rebalance events per hour"); + failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total", + "The total number of failed rebalance events"); + failedRebalanceRate = createMetric(metrics, "failed-rebalance-rate-per-hour", + "The number of failed rebalance events per hour"); + + successfulRebalanceSensor = metrics.sensor("rebalance-latency"); + successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg()); + successfulRebalanceSensor.add(rebalanceLatencyMax, new Max()); + successfulRebalanceSensor.add(rebalanceLatencyTotal, new CumulativeSum()); + successfulRebalanceSensor.add(rebalanceTotal, new CumulativeCount()); + successfulRebalanceSensor.add(rebalanceRatePerHour, new Rate(TimeUnit.HOURS, new WindowedCount())); + + failedRebalanceSensor = metrics.sensor("failed-rebalance"); + failedRebalanceSensor.add(failedRebalanceTotal, new CumulativeSum()); + failedRebalanceSensor.add(failedRebalanceRate, new Rate(TimeUnit.HOURS, new WindowedCount())); + + Measurable lastRebalance = (config, now) -> { + if (lastRebalanceEndMs == -1L) + return -1d; + else + return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS); + }; + lastRebalanceSecondsAgo = createMetric(metrics, + "last-rebalance-seconds-ago", + "The number of seconds since the last rebalance event"); + metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance); + } + + private MetricName createMetric(Metrics metrics, String name, String description) { + return metrics.metricName(name, metricGroupName, description); + } + + public void recordRebalanceStarted(long nowMs) { + lastRebalanceStartMs = nowMs; + } + + public void recordRebalanceEnded(long nowMs) { + lastRebalanceEndMs = nowMs; + successfulRebalanceSensor.record(nowMs - lastRebalanceStartMs); + } + + public void maybeRecordRebalanceFailed() { + if (lastRebalanceStartMs <= lastRebalanceEndMs) + return; + failedRebalanceSensor.record(); + } + + public boolean rebalanceStarted() { + return lastRebalanceStartMs > lastRebalanceEndMs; + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index d6ae6295060..d62d3d8a35e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.requests.MetadataResponse; @@ -196,18 +197,19 @@ public class ConsumerTestBuilder implements Closeable { gi.groupInstanceId, metrics)); MembershipManager mm = spy( - new MembershipManagerImpl( - gi.groupId, - gi.groupInstanceId, - groupRebalanceConfig.rebalanceTimeoutMs, - gi.serverAssignor, - subscriptions, - commit, - metadata, - logContext, - Optional.empty(), - backgroundEventHandler, - time + new MembershipManagerImpl( + gi.groupId, + gi.groupInstanceId, + groupRebalanceConfig.rebalanceTimeoutMs, + gi.serverAssignor, + subscriptions, + commit, + metadata, + logContext, + Optional.empty(), + backgroundEventHandler, + time, + mock(RebalanceMetricsManager.class) ) ); HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 5cf5b9e2d92..4d4492bcb47 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -306,7 +306,7 @@ public class HeartbeatRequestManagerTest { new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId) .setMemberEpoch(memberEpoch)); - membershipManager.onHeartbeatResponseReceived(result.data()); + membershipManager.onHeartbeatSuccess(result.data()); // Create a ConsumerHeartbeatRequest and verify the payload NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); @@ -441,7 +441,7 @@ public class HeartbeatRequestManagerTest { switch (error) { case NONE: verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class)); - verify(membershipManager, times(2)).onHeartbeatResponseReceived(mockResponse.data()); + verify(membershipManager, times(2)).onHeartbeatSuccess(mockResponse.data()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestState.nextHeartbeatMs(time.milliseconds())); break; @@ -547,7 +547,7 @@ public class HeartbeatRequestManagerTest { .setMemberEpoch(1) .setAssignment(assignmentTopic1)); when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, "topic1")); - membershipManager.onHeartbeatResponseReceived(rs1.data()); + membershipManager.onHeartbeatSuccess(rs1.data()); // We remain in RECONCILING state, as the assignment will be reconciled on the next poll assertEquals(MemberState.RECONCILING, membershipManager.state()); @@ -712,7 +712,7 @@ public class HeartbeatRequestManagerTest { .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) .setMemberId(memberId) .setMemberEpoch(memberEpoch)); - membershipManager.onHeartbeatResponseReceived(rs1.data()); + membershipManager.onHeartbeatSuccess(rs1.data()); assertEquals(MemberState.STABLE, membershipManager.state()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 3294068b074..8a0fcf85758 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -16,12 +16,15 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -102,6 +105,8 @@ public class MembershipManagerImplTest { private BlockingQueue<BackgroundEvent> backgroundEventQueue; private BackgroundEventHandler backgroundEventHandler; private Time time; + private Metrics metrics; + private RebalanceMetricsManager rebalanceMetricsManager; @BeforeEach public void setup() { @@ -111,7 +116,9 @@ public class MembershipManagerImplTest { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); backgroundEventQueue = testBuilder.backgroundEventQueue; backgroundEventHandler = testBuilder.backgroundEventHandler; - time = testBuilder.time; + time = new MockTime(0); + metrics = new Metrics(time); + rebalanceMetricsManager = new RebalanceMetricsManager(metrics); } @AfterEach @@ -135,15 +142,16 @@ public class MembershipManagerImplTest { return spy(new MembershipManagerImpl( GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), - backgroundEventHandler, time)); + backgroundEventHandler, time, rebalanceMetricsManager)); } private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupInstanceId, String serverAssignor) { - MembershipManagerImpl manager = new MembershipManagerImpl( + MembershipManagerImpl manager = spy(new MembershipManagerImpl( GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager, - metadata, logContext, Optional.empty(), backgroundEventHandler, time); + metadata, logContext, Optional.empty(), backgroundEventHandler, time, + rebalanceMetricsManager)); manager.transitionToJoining(); return manager; } @@ -160,7 +168,6 @@ public class MembershipManagerImplTest { @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { createMembershipManagerJoiningGroup(); - createMembershipManagerJoiningGroup(null, null); } @Test @@ -169,7 +176,7 @@ public class MembershipManagerImplTest { MembershipManagerImpl manager = new MembershipManagerImpl( GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), - backgroundEventHandler, time); + backgroundEventHandler, time, rebalanceMetricsManager); manager.transitionToJoining(); clearInvocations(metadata); @@ -200,12 +207,12 @@ public class MembershipManagerImplTest { ConsumerGroupHeartbeatResponse responseWithoutAssignment = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data()); + membershipManager.onHeartbeatSuccess(responseWithoutAssignment.data()); assertNotEquals(MemberState.RECONCILING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithAssignment = createConsumerGroupHeartbeatResponse(createAssignment(true)); - membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data()); + membershipManager.onHeartbeatSuccess(responseWithAssignment.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } @@ -213,7 +220,7 @@ public class MembershipManagerImplTest { public void testMemberIdAndEpochResetOnFencedMembers() { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); @@ -230,7 +237,7 @@ public class MembershipManagerImplTest { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); @@ -246,7 +253,7 @@ public class MembershipManagerImplTest { MembershipManagerImpl membershipManager = new MembershipManagerImpl( GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), - backgroundEventHandler, time); + backgroundEventHandler, time, rebalanceMetricsManager); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); membershipManager.transitionToJoining(); @@ -299,7 +306,7 @@ public class MembershipManagerImplTest { membershipManager.registerStateListener(listener); int epoch = 5; - membershipManager.onHeartbeatResponseReceived(new ConsumerGroupHeartbeatResponseData() + membershipManager.onHeartbeatSuccess(new ConsumerGroupHeartbeatResponseData() .setErrorCode(Errors.NONE.code()) .setMemberId(MEMBER_ID) .setMemberEpoch(epoch)); @@ -307,7 +314,7 @@ public class MembershipManagerImplTest { verify(listener).onMemberEpochUpdated(Optional.of(epoch), Optional.of(MEMBER_ID)); clearInvocations(listener); - membershipManager.onHeartbeatResponseReceived(new ConsumerGroupHeartbeatResponseData() + membershipManager.onHeartbeatSuccess(new ConsumerGroupHeartbeatResponseData() .setErrorCode(Errors.NONE.code()) .setMemberId(MEMBER_ID) .setMemberEpoch(epoch)); @@ -316,7 +323,7 @@ public class MembershipManagerImplTest { private void mockStableMember(MembershipManagerImpl membershipManager) { ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); @@ -709,7 +716,7 @@ public class MembershipManagerImplTest { CompletableFuture<Void> leaveResult = membershipManager.leaveGroup(); - membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(createAssignment(true)).data()); + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true)).data()); assertEquals(MemberState.LEAVING, membershipManager.state()); assertEquals(-1, membershipManager.memberEpoch()); @@ -726,7 +733,7 @@ public class MembershipManagerImplTest { when(membershipManager.state()).thenReturn(state); ConsumerGroupHeartbeatResponseData responseData = mock(ConsumerGroupHeartbeatResponseData.class); - membershipManager.onHeartbeatResponseReceived(responseData); + membershipManager.onHeartbeatSuccess(responseData); assertEquals(state, membershipManager.state()); verify(responseData, never()).memberId(); @@ -861,7 +868,7 @@ public class MembershipManagerImplTest { public void testFatalFailureWhenStateIsStable() { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); testStateUpdateOnFatalFailure(membershipManager); @@ -930,9 +937,9 @@ public class MembershipManagerImplTest { // Updating state with a heartbeat response containing errors cannot be performed and // should fail. ConsumerGroupHeartbeatResponse unknownMemberResponse = - createConsumerGroupHeartbeatResponseWithError(); + createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID); assertThrows(IllegalArgumentException.class, - () -> membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data())); + () -> membershipManager.onHeartbeatSuccess(unknownMemberResponse.data())); } /** @@ -1099,7 +1106,7 @@ public class MembershipManagerImplTest { // Target assignment received again with the same unresolved topic. Client should keep it // as unresolved. clearInvocations(subscriptionState); - membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(assignment).data()); + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment).data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); assertEquals(Collections.singleton(topic2), membershipManager.topicsAwaitingReconciliation()); verify(subscriptionState, never()).assignFromSubscribed(anyCollection()); @@ -1173,6 +1180,9 @@ public class MembershipManagerImplTest { verify(subscriptionState, never()).assignFromSubscribed(anyCollection()); assertEquals(MemberState.STABLE, membershipManager.state()); + + assertEquals(1.0d, getMetricValue(metrics, rebalanceMetricsManager.rebalanceTotal)); + assertEquals(0.0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceTotal)); } @Test @@ -1376,7 +1386,6 @@ public class MembershipManagerImplTest { @Test public void testListenerCallbacksBasic() { - // Step 1: set up mocks MembershipManagerImpl membershipManager = createMemberInStableState(); CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(); ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); @@ -1685,7 +1694,7 @@ public class MembershipManagerImplTest { public void testTransitionToLeavingWhileStableDueToStaleMember() { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); doNothing().when(subscriptionState).assignFromSubscribed(any()); assertEquals(MemberState.STABLE, membershipManager.state()); assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager); @@ -1832,8 +1841,8 @@ public class MembershipManagerImplTest { return new ConsumerRebalanceListenerInvoker( new LogContext(), subscriptionState, - new MockTime(1), - new RebalanceCallbackMetricsManager(new Metrics()) + time, + new RebalanceCallbackMetricsManager(new Metrics(time)) ); } @@ -1930,7 +1939,7 @@ public class MembershipManagerImplTest { @Test public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() { - MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(null); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); assertEquals(MemberState.JOINING, membershipManager.state()); receiveEmptyAssignment(membershipManager); @@ -1940,6 +1949,144 @@ public class MembershipManagerImplTest { assertEquals(MemberState.STABLE, membershipManager.state()); } + @Test + public void testMetricsWhenHeartbeatFailed() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + membershipManager.onHeartbeatFailure(); + + // Not expecting rebalance failures without assignments being reconciled + assertEquals(0.0d, getMetricValue(metrics, rebalanceMetricsManager.rebalanceTotal)); + assertEquals(0.0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceTotal)); + } + + @Test + public void testRebalanceMetricsOnSuccessfulRebalance() { + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); + mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1"); + + CompletableFuture<Void> commitResult = mockRevocationNoCallbacks(true); + + receiveEmptyAssignment(membershipManager); + long reconciliationDurationMs = 1234; + time.sleep(reconciliationDurationMs); + + membershipManager.poll(time.milliseconds()); + // Complete commit request to complete the callback invocation + commitResult.complete(null); + + assertEquals((double) reconciliationDurationMs, getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal)); + assertEquals((double) reconciliationDurationMs, getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyAvg)); + assertEquals((double) reconciliationDurationMs, getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyMax)); + assertEquals(1d, getMetricValue(metrics, rebalanceMetricsManager.rebalanceTotal)); + assertEquals(120d, 1d, (double) getMetricValue(metrics, rebalanceMetricsManager.rebalanceRatePerHour)); + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceRate)); + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceTotal)); + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); + } + + @Test + public void testRebalanceMetricsForMultipleReconcilations() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + String topicName = "topic1"; + Uuid topicId = Uuid.randomUuid(); + + SleepyRebalanceListener listener = new SleepyRebalanceListener(1453, time); + when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet()); + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); + + membershipManager.poll(time.milliseconds()); + + // assign partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + topicPartitions(topicName, 0, 1), + true + ); + + long firstRebalanaceTimesMs = listener.sleepMs; + listener.reset(); + + // ack + membershipManager.onHeartbeatRequestSent(); + + // revoke all + when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1)); + receiveAssignment(topicId, Collections.singletonList(2), membershipManager); + + membershipManager.poll(time.milliseconds()); + + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions(topicName, 0, 1), + true + ); + + // assign new partition 2 + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + topicPartitions(topicName, 2), + true + ); + membershipManager.onHeartbeatRequestSent(); + + long secondRebalanceMs = listener.sleepMs; + long total = firstRebalanaceTimesMs + secondRebalanceMs; + double avg = total / 2.0d; + long max = Math.max(firstRebalanaceTimesMs, secondRebalanceMs); + assertEquals((double) total, getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal)); + assertEquals(avg, (double) getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyAvg), 1d); + assertEquals((double) max, getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyMax)); + assertEquals(2d, getMetricValue(metrics, rebalanceMetricsManager.rebalanceTotal)); + // rate is not tested because it is subject to Rate implementation + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceRate)); + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceTotal)); + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); + + } + + @Test + public void testRebalanceMetricsOnFailedRebalance() { + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); + + Uuid topicId = Uuid.randomUuid(); + + receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); + + // sleep for an arbitrary amount + time.sleep(2300); + + assertTrue(rebalanceMetricsManager.rebalanceStarted()); + membershipManager.onHeartbeatFailure(); + + assertEquals((double) 0, getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal)); + assertEquals(0d, getMetricValue(metrics, rebalanceMetricsManager.rebalanceTotal)); + assertEquals(120d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceRate)); + assertEquals(1d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceTotal)); + assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); + } + + private Object getMetricValue(Metrics metrics, MetricName name) { + return metrics.metrics().get(name).metricValue(); + } + private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment( Uuid topicId, String topicName, List<Integer> partitions) { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); @@ -2119,7 +2266,7 @@ public class MembershipManagerImplTest { when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty()); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); membershipManager.poll(time.milliseconds()); if (expectSubscriptionUpdated) { @@ -2136,9 +2283,9 @@ public class MembershipManagerImplTest { } private MembershipManagerImpl createMemberInStableState(String groupInstanceId) { - MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(groupInstanceId); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(groupInstanceId, null); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); return membershipManager; } @@ -2150,7 +2297,7 @@ public class MembershipManagerImplTest { .setTopicId(tp.getKey()) .setPartitions(new ArrayList<>(tp.getValue()))).collect(Collectors.toList())); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); } private void receiveAssignment(Uuid topicId, List<Integer> partitions, MembershipManager membershipManager) { @@ -2160,7 +2307,7 @@ public class MembershipManagerImplTest { .setTopicId(topicId) .setPartitions(partitions))); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); } private void receiveAssignmentAfterRejoin(Uuid topicId, List<Integer> partitions, MembershipManager membershipManager) { @@ -2171,7 +2318,7 @@ public class MembershipManagerImplTest { .setPartitions(partitions))); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponseWithBumpedEpoch(targetAssignment); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); } private void receiveEmptyAssignment(MembershipManager membershipManager) { @@ -2179,7 +2326,7 @@ public class MembershipManagerImplTest { ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(Collections.emptyList()); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment); - membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); + membershipManager.onHeartbeatSuccess(heartbeatResponse.data()); } /** @@ -2307,9 +2454,9 @@ public class MembershipManagerImplTest { .setAssignment(assignment)); } - private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError() { + private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError(Errors error) { return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + .setErrorCode(error.code()) .setMemberId(MEMBER_ID) .setMemberEpoch(5)); } @@ -2356,4 +2503,29 @@ public class MembershipManagerImplTest { Arguments.of(MemberState.STALE)); } + private static class SleepyRebalanceListener implements ConsumerRebalanceListener { + private long sleepMs; + private final long sleepDurationMs; + private final Time time; + SleepyRebalanceListener(long sleepDurationMs, Time time) { + this.sleepDurationMs = sleepDurationMs; + this.time = time; + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + sleepMs += sleepDurationMs; + time.sleep(sleepDurationMs); + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + sleepMs += sleepDurationMs; + time.sleep(sleepDurationMs); + } + + public void reset() { + sleepMs = 0; + } + } }