This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53c41aca7ba KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer 
(#15339)
53c41aca7ba is described below

commit 53c41aca7ba9469a0145023112f5fad254da4fa8
Author: Philip Nee <p...@confluent.io>
AuthorDate: Wed Feb 28 01:52:01 2024 -0800

    KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (#15339)
    
    Adding the following rebalance metrics to the consumer:
    
    rebalance-latency-avg
    rebalance-latency-max
    rebalance-latency-total
    rebalance-rate-per-hour
    rebalance-total
    failed-rebalance-rate-per-hour
    failed-rebalance-total
    
    Due to the difference in protocol, we need to redefine when rebalance 
starts and ends.
    Start of Rebalance:
    Current: Right before sending out JoinGroup
    ConsumerGroup: When the client receives assignments from the HB
    
    End of Rebalance - Successful Case:
    Current: Receiving SyncGroup request after transitioning to 
"COMPLETING_REBALANCE"
    ConsumerGroup: After completing reconciliation and right before sending out 
"Ack" heartbeat
    
    End of Rebalance - Failed Case:
    Current: Any failure in the JoinGroup/SyncGroup response
    ConsumerGroup: Failure in the heartbeat
    
    Note: Afterall, we try to be consistent with the current protocol. 
Rebalances start and end with sending and receiving network requests. Failures 
in network requests signify the user failures in rebalance. And it is entirely 
possible to have multiple failures before having a successful one.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../internals/HeartbeatRequestManager.java         |   3 +-
 .../consumer/internals/MembershipManager.java      |   7 +-
 .../consumer/internals/MembershipManagerImpl.java  |  62 +++++-
 .../consumer/internals/RequestManagers.java        |   3 +-
 .../internals/metrics/RebalanceMetricsManager.java | 114 ++++++++++
 .../consumer/internals/ConsumerTestBuilder.java    |  26 +--
 .../internals/HeartbeatRequestManagerTest.java     |   8 +-
 .../internals/MembershipManagerImplTest.java       | 240 ++++++++++++++++++---
 8 files changed, 408 insertions(+), 55 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 550e5b92ebf..d551dbe2508 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -327,7 +327,7 @@ public class HeartbeatRequestManager implements 
RequestManager {
             
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
             heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
             heartbeatRequestState.resetTimer();
-            membershipManager.onHeartbeatResponseReceived(response.data());
+            membershipManager.onHeartbeatSuccess(response.data());
             maybeSendGroupMetadataUpdateEvent();
             return;
         }
@@ -357,6 +357,7 @@ public class HeartbeatRequestManager implements 
RequestManager {
 
         this.heartbeatState.reset();
         this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
+        membershipManager.onHeartbeatFailure();
 
         switch (error) {
             case NOT_COORDINATOR:
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index a9c23d7b4d5..f0a641d140f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -68,7 +68,12 @@ public interface MembershipManager extends RequestManager {
      *
      * @param response Heartbeat response to extract member info and errors 
from.
      */
-    void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData 
response);
+    void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response);
+
+    /**
+     * Notify the member that an error heartbeat response was received.
+     */
+    void onHeartbeatFailure();
 
     /**
      * Update state when a heartbeat is sent out. This will transition out of 
the states that end
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index a9b0f3b94d8..6f3947eea46 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -27,11 +27,13 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundE
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
@@ -263,6 +265,11 @@ public class MembershipManagerImpl implements 
MembershipManager {
      * when the timer is reset, only when it completes releasing its 
assignment.
      */
     private CompletableFuture<Void> staleMemberAssignmentRelease;
+    
+    /*
+     * Measures successful rebalance latency and number of failed rebalances.
+     */
+    private final RebalanceMetricsManager metricsManager;
 
     private final Time time;
 
@@ -284,7 +291,35 @@ public class MembershipManagerImpl implements 
MembershipManager {
                                  LogContext logContext,
                                  Optional<ClientTelemetryReporter> 
clientTelemetryReporter,
                                  BackgroundEventHandler backgroundEventHandler,
-                                 Time time) {
+                                 Time time,
+                                 Metrics metrics) {
+        this(groupId,
+            groupInstanceId,
+            rebalanceTimeoutMs,
+            serverAssignor,
+            subscriptions,
+            commitRequestManager,
+            metadata,
+            logContext,
+            clientTelemetryReporter,
+            backgroundEventHandler,
+            time,
+            new RebalanceMetricsManager(metrics));
+    }
+
+    // Visible for testing
+    MembershipManagerImpl(String groupId,
+                          Optional<String> groupInstanceId,
+                          int rebalanceTimeoutMs,
+                          Optional<String> serverAssignor,
+                          SubscriptionState subscriptions,
+                          CommitRequestManager commitRequestManager,
+                          ConsumerMetadata metadata,
+                          LogContext logContext,
+                          Optional<ClientTelemetryReporter> 
clientTelemetryReporter,
+                          BackgroundEventHandler backgroundEventHandler,
+                          Time time,
+                          RebalanceMetricsManager metricsManager) {
         this.groupId = groupId;
         this.state = MemberState.UNSUBSCRIBED;
         this.serverAssignor = serverAssignor;
@@ -301,6 +336,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.backgroundEventHandler = backgroundEventHandler;
         this.time = time;
+        this.metricsManager = metricsManager;
     }
 
     /**
@@ -314,10 +350,27 @@ public class MembershipManagerImpl implements 
MembershipManager {
             throw new IllegalStateException(String.format("Invalid state 
transition from %s to %s",
                     state, nextState));
         }
+
+        if (isCompletingRebalance(state, nextState)) {
+            metricsManager.recordRebalanceEnded(time.milliseconds());
+        }
+        if (isStartingRebalance(state, nextState)) {
+            metricsManager.recordRebalanceStarted(time.milliseconds());
+        }
+
         log.trace("Member {} with epoch {} transitioned from {} to {}.", 
memberId, memberEpoch, state, nextState);
         this.state = nextState;
     }
 
+    private static boolean isCompletingRebalance(MemberState currentState, 
MemberState nextState) {
+        return currentState == MemberState.RECONCILING &&
+            (nextState == MemberState.STABLE || nextState == 
MemberState.ACKNOWLEDGING);
+    }
+
+    private static boolean isStartingRebalance(MemberState currentState, 
MemberState nextState) {
+        return currentState != MemberState.RECONCILING && nextState == 
MemberState.RECONCILING;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -354,7 +407,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
      * {@inheritDoc}
      */
     @Override
-    public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData 
response) {
+    public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData 
response) {
         if (response.errorCode() != Errors.NONE.code()) {
             String errorMessage = String.format(
                     "Unexpected error in Heartbeat response. Expected no 
error, but received: %s",
@@ -403,6 +456,11 @@ public class MembershipManagerImpl implements 
MembershipManager {
         }
     }
 
+    @Override
+    public void onHeartbeatFailure() {
+        metricsManager.maybeRecordRebalanceFailed();
+    }
+
     /**
      * @return True if the consumer is not a member of the group.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 2d90a3ad708..0b4c043d4a4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -189,7 +189,8 @@ public class RequestManagers implements Closeable {
                             logContext,
                             clientTelemetryReporter,
                             backgroundEventHandler,
-                            time);
+                            time,
+                            metrics);
                     membershipManager.registerStateListener(commit);
                     heartbeatRequestManager = new HeartbeatRequestManager(
                             logContext,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
new file mode 100644
index 00000000000..a255487f37a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
+
+public class RebalanceMetricsManager {
+    private final Sensor successfulRebalanceSensor;
+    private final Sensor failedRebalanceSensor;
+    private final String metricGroupName;
+
+    public final MetricName rebalanceLatencyAvg;
+    public final MetricName rebalanceLatencyMax;
+    public final MetricName rebalanceLatencyTotal;
+    public final MetricName rebalanceTotal;
+    public final MetricName rebalanceRatePerHour;
+    public final MetricName lastRebalanceSecondsAgo;
+    public final MetricName failedRebalanceTotal;
+    public final MetricName failedRebalanceRate;
+    private long lastRebalanceEndMs = -1L;
+    private long lastRebalanceStartMs = -1L;
+
+    public RebalanceMetricsManager(Metrics metrics) {
+        metricGroupName = CONSUMER_METRIC_GROUP_PREFIX + 
COORDINATOR_METRICS_SUFFIX;
+
+        rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg",
+            "The average time taken for a group to complete a rebalance");
+        rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max",
+            "The max time taken for a group to complete a rebalance");
+        rebalanceLatencyTotal = createMetric(metrics, 
"rebalance-latency-total",
+            "The total number of milliseconds spent in rebalances");
+        rebalanceTotal = createMetric(metrics, "rebalance-total",
+            "The total number of rebalance events");
+        rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour",
+            "The number of rebalance events per hour");
+        failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total",
+            "The total number of failed rebalance events");
+        failedRebalanceRate = createMetric(metrics, 
"failed-rebalance-rate-per-hour",
+            "The number of failed rebalance events per hour");
+
+        successfulRebalanceSensor = metrics.sensor("rebalance-latency");
+        successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg());
+        successfulRebalanceSensor.add(rebalanceLatencyMax, new Max());
+        successfulRebalanceSensor.add(rebalanceLatencyTotal, new 
CumulativeSum());
+        successfulRebalanceSensor.add(rebalanceTotal, new CumulativeCount());
+        successfulRebalanceSensor.add(rebalanceRatePerHour, new 
Rate(TimeUnit.HOURS, new WindowedCount()));
+
+        failedRebalanceSensor = metrics.sensor("failed-rebalance");
+        failedRebalanceSensor.add(failedRebalanceTotal, new CumulativeSum());
+        failedRebalanceSensor.add(failedRebalanceRate, new 
Rate(TimeUnit.HOURS, new WindowedCount()));
+
+        Measurable lastRebalance = (config, now) -> {
+            if (lastRebalanceEndMs == -1L)
+                return -1d;
+            else
+                return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, 
TimeUnit.MILLISECONDS);
+        };
+        lastRebalanceSecondsAgo = createMetric(metrics,
+            "last-rebalance-seconds-ago",
+            "The number of seconds since the last rebalance event");
+        metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance);
+    }
+
+    private MetricName createMetric(Metrics metrics, String name, String 
description) {
+        return metrics.metricName(name, metricGroupName, description);
+    }
+
+    public void recordRebalanceStarted(long nowMs) {
+        lastRebalanceStartMs = nowMs;
+    }
+
+    public void recordRebalanceEnded(long nowMs) {
+        lastRebalanceEndMs = nowMs;
+        successfulRebalanceSensor.record(nowMs - lastRebalanceStartMs);
+    }
+
+    public void maybeRecordRebalanceFailed() {
+        if (lastRebalanceStartMs <= lastRebalanceEndMs)
+            return;
+        failedRebalanceSensor.record();
+    }
+
+    public boolean rebalanceStarted() {
+        return lastRebalanceStartMs > lastRebalanceEndMs;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index d6ae6295060..d62d3d8a35e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
+import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -196,18 +197,19 @@ public class ConsumerTestBuilder implements Closeable {
                     gi.groupInstanceId,
                     metrics));
             MembershipManager mm = spy(
-                    new MembershipManagerImpl(
-                        gi.groupId,
-                        gi.groupInstanceId,
-                        groupRebalanceConfig.rebalanceTimeoutMs,
-                        gi.serverAssignor,
-                        subscriptions,
-                        commit,
-                        metadata,
-                        logContext,
-                        Optional.empty(),
-                        backgroundEventHandler,
-                        time
+                new MembershipManagerImpl(
+                    gi.groupId,
+                    gi.groupInstanceId,
+                    groupRebalanceConfig.rebalanceTimeoutMs,
+                    gi.serverAssignor,
+                    subscriptions,
+                    commit,
+                    metadata,
+                    logContext,
+                    Optional.empty(),
+                    backgroundEventHandler,
+                    time,
+                    mock(RebalanceMetricsManager.class)
                 )
             );
             HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 5cf5b9e2d92..4d4492bcb47 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -306,7 +306,7 @@ public class HeartbeatRequestManagerTest {
             new ConsumerGroupHeartbeatResponse(new 
ConsumerGroupHeartbeatResponseData()
             .setMemberId(memberId)
             .setMemberEpoch(memberEpoch));
-        membershipManager.onHeartbeatResponseReceived(result.data());
+        membershipManager.onHeartbeatSuccess(result.data());
 
         // Create a ConsumerHeartbeatRequest and verify the payload
         NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
@@ -441,7 +441,7 @@ public class HeartbeatRequestManagerTest {
         switch (error) {
             case NONE:
                 
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
-                verify(membershipManager, 
times(2)).onHeartbeatResponseReceived(mockResponse.data());
+                verify(membershipManager, 
times(2)).onHeartbeatSuccess(mockResponse.data());
                 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
                 break;
 
@@ -547,7 +547,7 @@ public class HeartbeatRequestManagerTest {
                 .setMemberEpoch(1)
                 .setAssignment(assignmentTopic1));
         
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
"topic1"));
-        membershipManager.onHeartbeatResponseReceived(rs1.data());
+        membershipManager.onHeartbeatSuccess(rs1.data());
 
         // We remain in RECONCILING state, as the assignment will be 
reconciled on the next poll
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -712,7 +712,7 @@ public class HeartbeatRequestManagerTest {
                 .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
                 .setMemberId(memberId)
                 .setMemberEpoch(memberEpoch));
-        membershipManager.onHeartbeatResponseReceived(rs1.data());
+        membershipManager.onHeartbeatSuccess(rs1.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 3294068b074..8a0fcf85758 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
+import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -102,6 +105,8 @@ public class MembershipManagerImplTest {
     private BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private BackgroundEventHandler backgroundEventHandler;
     private Time time;
+    private Metrics metrics;
+    private RebalanceMetricsManager rebalanceMetricsManager;
 
     @BeforeEach
     public void setup() {
@@ -111,7 +116,9 @@ public class MembershipManagerImplTest {
         commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
         backgroundEventQueue = testBuilder.backgroundEventQueue;
         backgroundEventHandler = testBuilder.backgroundEventHandler;
-        time = testBuilder.time;
+        time = new MockTime(0);
+        metrics = new Metrics(time);
+        rebalanceMetricsManager = new RebalanceMetricsManager(metrics);
     }
 
     @AfterEach
@@ -135,15 +142,16 @@ public class MembershipManagerImplTest {
         return spy(new MembershipManagerImpl(
             GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, 
Optional.empty(),
             subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-            backgroundEventHandler, time));
+            backgroundEventHandler, time, rebalanceMetricsManager));
     }
 
     private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId,
                                                                       String 
serverAssignor) {
-        MembershipManagerImpl manager = new MembershipManagerImpl(
+        MembershipManagerImpl manager = spy(new MembershipManagerImpl(
                 GROUP_ID, Optional.ofNullable(groupInstanceId), 
REBALANCE_TIMEOUT,
                 Optional.ofNullable(serverAssignor), subscriptionState, 
commitRequestManager,
-                metadata, logContext, Optional.empty(), 
backgroundEventHandler, time);
+                metadata, logContext, Optional.empty(), 
backgroundEventHandler, time,
+                rebalanceMetricsManager));
         manager.transitionToJoining();
         return manager;
     }
@@ -160,7 +168,6 @@ public class MembershipManagerImplTest {
     @Test
     public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
         createMembershipManagerJoiningGroup();
-        createMembershipManagerJoiningGroup(null, null);
     }
 
     @Test
@@ -169,7 +176,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl manager = new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-                backgroundEventHandler, time);
+                backgroundEventHandler, time, rebalanceMetricsManager);
         manager.transitionToJoining();
         clearInvocations(metadata);
 
@@ -200,12 +207,12 @@ public class MembershipManagerImplTest {
 
         ConsumerGroupHeartbeatResponse responseWithoutAssignment =
                 createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data());
+        membershipManager.onHeartbeatSuccess(responseWithoutAssignment.data());
         assertNotEquals(MemberState.RECONCILING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithAssignment =
                 createConsumerGroupHeartbeatResponse(createAssignment(true));
-        
membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data());
+        membershipManager.onHeartbeatSuccess(responseWithAssignment.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
@@ -213,7 +220,7 @@ public class MembershipManagerImplTest {
     public void testMemberIdAndEpochResetOnFencedMembers() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -230,7 +237,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -246,7 +253,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl membershipManager = new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-                backgroundEventHandler, time);
+                backgroundEventHandler, time, rebalanceMetricsManager);
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
         membershipManager.transitionToJoining();
 
@@ -299,7 +306,7 @@ public class MembershipManagerImplTest {
         membershipManager.registerStateListener(listener);
         int epoch = 5;
 
-        membershipManager.onHeartbeatResponseReceived(new 
ConsumerGroupHeartbeatResponseData()
+        membershipManager.onHeartbeatSuccess(new 
ConsumerGroupHeartbeatResponseData()
                 .setErrorCode(Errors.NONE.code())
                 .setMemberId(MEMBER_ID)
                 .setMemberEpoch(epoch));
@@ -307,7 +314,7 @@ public class MembershipManagerImplTest {
         verify(listener).onMemberEpochUpdated(Optional.of(epoch), 
Optional.of(MEMBER_ID));
         clearInvocations(listener);
 
-        membershipManager.onHeartbeatResponseReceived(new 
ConsumerGroupHeartbeatResponseData()
+        membershipManager.onHeartbeatSuccess(new 
ConsumerGroupHeartbeatResponseData()
                 .setErrorCode(Errors.NONE.code())
                 .setMemberId(MEMBER_ID)
                 .setMemberEpoch(epoch));
@@ -316,7 +323,7 @@ public class MembershipManagerImplTest {
 
     private void mockStableMember(MembershipManagerImpl membershipManager) {
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -709,7 +716,7 @@ public class MembershipManagerImplTest {
 
         CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
 
-        
membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
+        
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
 
         assertEquals(MemberState.LEAVING, membershipManager.state());
         assertEquals(-1, membershipManager.memberEpoch());
@@ -726,7 +733,7 @@ public class MembershipManagerImplTest {
         when(membershipManager.state()).thenReturn(state);
         ConsumerGroupHeartbeatResponseData responseData = 
mock(ConsumerGroupHeartbeatResponseData.class);
 
-        membershipManager.onHeartbeatResponseReceived(responseData);
+        membershipManager.onHeartbeatSuccess(responseData);
 
         assertEquals(state, membershipManager.state());
         verify(responseData, never()).memberId();
@@ -861,7 +868,7 @@ public class MembershipManagerImplTest {
     public void testFatalFailureWhenStateIsStable() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
 
         testStateUpdateOnFatalFailure(membershipManager);
@@ -930,9 +937,9 @@ public class MembershipManagerImplTest {
         // Updating state with a heartbeat response containing errors cannot 
be performed and
         // should fail.
         ConsumerGroupHeartbeatResponse unknownMemberResponse =
-                createConsumerGroupHeartbeatResponseWithError();
+                
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
         assertThrows(IllegalArgumentException.class,
-                () -> 
membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data()));
+                () -> 
membershipManager.onHeartbeatSuccess(unknownMemberResponse.data()));
     }
 
     /**
@@ -1099,7 +1106,7 @@ public class MembershipManagerImplTest {
         // Target assignment received again with the same unresolved topic. 
Client should keep it
         // as unresolved.
         clearInvocations(subscriptionState);
-        
membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(assignment).data());
+        
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment).data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         assertEquals(Collections.singleton(topic2), 
membershipManager.topicsAwaitingReconciliation());
         verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
@@ -1173,6 +1180,9 @@ public class MembershipManagerImplTest {
         verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
 
         assertEquals(MemberState.STABLE, membershipManager.state());
+
+        assertEquals(1.0d, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceTotal));
+        assertEquals(0.0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceTotal));
     }
 
     @Test
@@ -1376,7 +1386,6 @@ public class MembershipManagerImplTest {
 
     @Test
     public void testListenerCallbacksBasic() {
-        // Step 1: set up mocks
         MembershipManagerImpl membershipManager = createMemberInStableState();
         CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
         ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
@@ -1685,7 +1694,7 @@ public class MembershipManagerImplTest {
     public void testTransitionToLeavingWhileStableDueToStaleMember() {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         doNothing().when(subscriptionState).assignFromSubscribed(any());
         assertEquals(MemberState.STABLE, membershipManager.state());
         
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
@@ -1832,8 +1841,8 @@ public class MembershipManagerImplTest {
         return new ConsumerRebalanceListenerInvoker(
                 new LogContext(),
                 subscriptionState,
-                new MockTime(1),
-                new RebalanceCallbackMetricsManager(new Metrics())
+                time,
+                new RebalanceCallbackMetricsManager(new Metrics(time))
         );
     }
 
@@ -1930,7 +1939,7 @@ public class MembershipManagerImplTest {
 
     @Test
     public void 
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
-        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup(null);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         assertEquals(MemberState.JOINING, membershipManager.state());
         receiveEmptyAssignment(membershipManager);
 
@@ -1940,6 +1949,144 @@ public class MembershipManagerImplTest {
         assertEquals(MemberState.STABLE, membershipManager.state());
     }
 
+    @Test
+    public void testMetricsWhenHeartbeatFailed() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        membershipManager.onHeartbeatFailure();
+
+        // Not expecting rebalance failures without assignments being 
reconciled
+        assertEquals(0.0d, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceTotal));
+        assertEquals(0.0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceTotal));
+    }
+
+    @Test
+    public void testRebalanceMetricsOnSuccessfulRebalance() {
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
+        mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
+
+        CompletableFuture<Void> commitResult = mockRevocationNoCallbacks(true);
+
+        receiveEmptyAssignment(membershipManager);
+        long reconciliationDurationMs = 1234;
+        time.sleep(reconciliationDurationMs);
+
+        membershipManager.poll(time.milliseconds());
+        // Complete commit request to complete the callback invocation
+        commitResult.complete(null);
+
+        assertEquals((double) reconciliationDurationMs, 
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal));
+        assertEquals((double) reconciliationDurationMs, 
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyAvg));
+        assertEquals((double) reconciliationDurationMs, 
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyMax));
+        assertEquals(1d, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceTotal));
+        assertEquals(120d, 1d, (double) getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceRatePerHour));
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceRate));
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceTotal));
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
+    }
+
+    @Test
+    public void testRebalanceMetricsForMultipleReconcilations() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        String topicName = "topic1";
+        Uuid topicId = Uuid.randomUuid();
+
+        SleepyRebalanceListener listener = new SleepyRebalanceListener(1453, 
time);
+        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+
+        membershipManager.poll(time.milliseconds());
+
+        // assign partitions
+        performCallback(
+            membershipManager,
+            invoker,
+            ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+            topicPartitions(topicName, 0, 1),
+            true
+        );
+
+        long firstRebalanaceTimesMs = listener.sleepMs;
+        listener.reset();
+
+        // ack
+        membershipManager.onHeartbeatRequestSent();
+
+        // revoke all
+        
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
 0, 1));
+        receiveAssignment(topicId, Collections.singletonList(2), 
membershipManager);
+
+        membershipManager.poll(time.milliseconds());
+
+        performCallback(
+            membershipManager,
+            invoker,
+            ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+            topicPartitions(topicName, 0, 1),
+            true
+        );
+
+        // assign new partition 2
+        performCallback(
+            membershipManager,
+            invoker,
+            ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+            topicPartitions(topicName, 2),
+            true
+        );
+        membershipManager.onHeartbeatRequestSent();
+
+        long secondRebalanceMs = listener.sleepMs;
+        long total = firstRebalanaceTimesMs + secondRebalanceMs;
+        double avg = total / 2.0d;
+        long max = Math.max(firstRebalanaceTimesMs, secondRebalanceMs);
+        assertEquals((double) total, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceLatencyTotal));
+        assertEquals(avg, (double) getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceLatencyAvg), 1d);
+        assertEquals((double) max, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceLatencyMax));
+        assertEquals(2d, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceTotal));
+        // rate is not tested because it is subject to Rate implementation
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceRate));
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceTotal));
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
+
+    }
+
+    @Test
+    public void testRebalanceMetricsOnFailedRebalance() {
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
+
+        Uuid topicId = Uuid.randomUuid();
+
+        receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+
+        // sleep for an arbitrary amount
+        time.sleep(2300);
+
+        assertTrue(rebalanceMetricsManager.rebalanceStarted());
+        membershipManager.onHeartbeatFailure();
+
+        assertEquals((double) 0, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceLatencyTotal));
+        assertEquals(0d, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceTotal));
+        assertEquals(120d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceRate));
+        assertEquals(1d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceTotal));
+        assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
+    }
+
+    private Object getMetricValue(Metrics metrics, MetricName name) {
+        return metrics.metrics().get(name).metricValue();
+    }
+
     private MembershipManagerImpl 
mockMemberSuccessfullyReceivesAndAcksAssignment(
             Uuid topicId, String topicName, List<Integer> partitions) {
         MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
@@ -2119,7 +2266,7 @@ public class MembershipManagerImplTest {
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
 
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         membershipManager.poll(time.milliseconds());
 
         if (expectSubscriptionUpdated) {
@@ -2136,9 +2283,9 @@ public class MembershipManagerImplTest {
     }
 
     private MembershipManagerImpl createMemberInStableState(String 
groupInstanceId) {
-        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup(groupInstanceId);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup(groupInstanceId, null);
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         return membershipManager;
     }
@@ -2150,7 +2297,7 @@ public class MembershipManagerImplTest {
                     .setTopicId(tp.getKey())
                     .setPartitions(new 
ArrayList<>(tp.getValue()))).collect(Collectors.toList()));
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(targetAssignment);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
     }
 
     private void receiveAssignment(Uuid topicId, List<Integer> partitions, 
MembershipManager membershipManager) {
@@ -2160,7 +2307,7 @@ public class MembershipManagerImplTest {
                                 .setTopicId(topicId)
                                 .setPartitions(partitions)));
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(targetAssignment);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
     }
 
     private void receiveAssignmentAfterRejoin(Uuid topicId, List<Integer> 
partitions, MembershipManager membershipManager) {
@@ -2171,7 +2318,7 @@ public class MembershipManagerImplTest {
                                 .setPartitions(partitions)));
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 
createConsumerGroupHeartbeatResponseWithBumpedEpoch(targetAssignment);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
     }
 
     private void receiveEmptyAssignment(MembershipManager membershipManager) {
@@ -2179,7 +2326,7 @@ public class MembershipManagerImplTest {
         ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
                 .setTopicPartitions(Collections.emptyList());
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(targetAssignment);
-        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+        membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
     }
 
     /**
@@ -2307,9 +2454,9 @@ public class MembershipManagerImplTest {
                 .setAssignment(assignment));
     }
 
-    private ConsumerGroupHeartbeatResponse 
createConsumerGroupHeartbeatResponseWithError() {
+    private ConsumerGroupHeartbeatResponse 
createConsumerGroupHeartbeatResponseWithError(Errors error) {
         return new ConsumerGroupHeartbeatResponse(new 
ConsumerGroupHeartbeatResponseData()
-                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                .setErrorCode(error.code())
                 .setMemberId(MEMBER_ID)
                 .setMemberEpoch(5));
     }
@@ -2356,4 +2503,29 @@ public class MembershipManagerImplTest {
             Arguments.of(MemberState.STALE));
     }
 
+    private static class SleepyRebalanceListener implements 
ConsumerRebalanceListener {
+        private long sleepMs;
+        private final long sleepDurationMs;
+        private final Time time;
+        SleepyRebalanceListener(long sleepDurationMs, Time time) {
+            this.sleepDurationMs = sleepDurationMs;
+            this.time = time;
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            sleepMs += sleepDurationMs;
+            time.sleep(sleepDurationMs);
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            sleepMs += sleepDurationMs;
+            time.sleep(sleepDurationMs);
+        }
+
+        public void reset() {
+            sleepMs = 0;
+        }
+    }
 }


Reply via email to