This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2da02d9fcf7 KAFKA-19723 Adding consumer rebalance metrics test (#20565)
2da02d9fcf7 is described below
commit 2da02d9fcf72ef0d73dbf958f2e24ce179bfd6ff
Author: Arpit Goyal <[email protected]>
AuthorDate: Mon Oct 6 23:57:50 2025 +0530
KAFKA-19723 Adding consumer rebalance metrics test (#20565)
Added Testcases for consumer rebalance metric manager test.
Reviewers: Lianet Magrans <[email protected]>, TengYao Chi
<[email protected]>, Hong-Yi Chen <[email protected]>
---
.../ConsumerRebalanceMetricsManagerTest.java | 298 ++++++++++++++++++++-
1 file changed, 288 insertions(+), 10 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
index a7d91227767..a5c52fac090 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
@@ -19,45 +19,323 @@ package
org.apache.kafka.clients.consumer.internals.metrics;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
class ConsumerRebalanceMetricsManagerTest {
- private final Time time = new MockTime();
- private final Metrics metrics = new Metrics(time);
+ private Time time;
+ private Metrics metrics;
+ private SubscriptionState subscriptionState;
+ private ConsumerRebalanceMetricsManager metricsManager;
+ private MetricConfig metricConfig;
+ private long windowSizeMs;
+ private int numSamples;
+
+ @BeforeEach
+ public void setUp() {
+ time = new MockTime();
+ // Use MetricConfig with its default values
+ windowSizeMs = 30000; // 30 seconds - default value
+ numSamples = 2; // default value
+ metricConfig = new MetricConfig()
+ .samples(numSamples)
+ .timeWindow(windowSizeMs,
java.util.concurrent.TimeUnit.MILLISECONDS);
+ metrics = new Metrics(metricConfig, time);
+ subscriptionState = new SubscriptionState(mock(LogContext.class),
AutoOffsetResetStrategy.EARLIEST);
+ metricsManager = new ConsumerRebalanceMetricsManager(metrics,
subscriptionState);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ metrics.close();
+ }
@Test
public void testAssignedPartitionCountMetric() {
- SubscriptionState subscriptionState = new
SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
- ConsumerRebalanceMetricsManager consumerRebalanceMetricsManager = new
ConsumerRebalanceMetricsManager(metrics, subscriptionState);
-
-
assertNotNull(metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount),
"Metric assigned-partitions has not been registered as expected");
+ assertNotNull(metrics.metric(metricsManager.assignedPartitionsCount),
"Metric assigned-partitions has not been registered as expected");
// Check for manually assigned partitions
subscriptionState.assignFromUser(Set.of(new TopicPartition("topic",
0), new TopicPartition("topic", 1)));
- assertEquals(2.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(2.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
subscriptionState.assignFromUser(Set.of());
- assertEquals(0.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(0.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
subscriptionState.unsubscribe();
- assertEquals(0.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(0.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
// Check for automatically assigned partitions
subscriptionState.subscribe(Set.of("topic"), Optional.empty());
subscriptionState.assignFromSubscribed(Set.of(new
TopicPartition("topic", 0)));
- assertEquals(1.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(1.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
+ }
+
+ @Test
+ public void testRebalanceTimingMetrics() {
+
+ // Verify timing metrics are registered
+ assertNotNull(metrics.metric(metricsManager.rebalanceLatencyAvg));
+ assertNotNull(metrics.metric(metricsManager.rebalanceLatencyMax));
+ assertNotNull(metrics.metric(metricsManager.rebalanceLatencyTotal));
+ assertNotNull(metrics.metric(metricsManager.rebalanceTotal));
+
+ // Record first rebalance (10ms duration)
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ // Verify metrics after first rebalance
+ assertEquals(10.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue());
+ assertEquals(10.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue());
+ assertEquals(10.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue());
+ assertEquals(1.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+ // Record second rebalance (30ms duration)
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(30);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ // Verify metrics after second rebalance
+ assertEquals(20.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+ "Average latency should be (10 + 30) / 2 = 20ms");
+ assertEquals(30.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+ "Max latency should be max(10, 30) = 30ms");
+ assertEquals(40.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+ "Total latency should be 10 + 30 = 40ms");
+ assertEquals(2.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+ // Record third rebalance (50ms duration)
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(50);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ // Verify metrics after third rebalance
+ assertEquals(30.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+ "Average latency should be (10 + 30 + 50) / 3 = 30ms");
+ assertEquals(50.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+ "Max latency should be max(10, 30, 50) = 50ms");
+ assertEquals(90.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+ "Total latency should be 10 + 30 + 50 = 90ms");
+ assertEquals(3.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+ }
+
+ @Test
+ public void testRebalanceRateMetric() {
+
+ // Verify rate metric is registered
+ assertNotNull(metrics.metric(metricsManager.rebalanceRatePerHour));
+
+ // Record 3 rebalances within 30ms total (3 x 10ms)
+ int rebalanceCount = 3;
+ long startTime = time.milliseconds();
+ for (int i = 0; i < rebalanceCount; i++) {
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ }
+ long endTime = time.milliseconds();
+ long actualElapsedMs = endTime - startTime;
+
+ double ratePerHour = (Double)
metrics.metric(metricsManager.rebalanceRatePerHour).metricValue();
+
+ // The Rate metric calculation:
+ // - Uses elapsed time from the oldest sample
+ // - Ensures minimum window size of (numSamples - 1) * windowSizeMs
+ // - With default config: minWindow = (2-1) * 30000 = 30000ms
+ long minWindowMs = (numSamples - 1) * windowSizeMs; // (2-1) * 30000 =
30000ms
+
+ // Since actualElapsedMs (30ms) is much less than minWindowMs
(30000ms),
+ // the rate calculation will use minWindowMs as the window
+ // Rate per hour = count / (windowMs / 1000) * 3600
+ double expectedRatePerHour = (double) rebalanceCount / (minWindowMs /
1000.0) * 3600.0;
+
+ assertEquals(expectedRatePerHour, ratePerHour, 1.0,
+ String.format("With %d rebalances in %dms, min window %dms:
expecting %.1f rebalances/hour",
+ rebalanceCount, actualElapsedMs, minWindowMs,
expectedRatePerHour));
+ }
+
+ @Test
+ public void testFailedRebalanceMetrics() {
+
+ // Verify failed rebalance metrics are registered
+ assertNotNull(metrics.metric(metricsManager.failedRebalanceTotal));
+ assertNotNull(metrics.metric(metricsManager.failedRebalanceRate));
+
+ assertEquals(0.0d,
metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
+ "Initially, there should be no failed rebalances");
+
+ // Start a rebalance but don't complete it
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+
+ metricsManager.maybeRecordRebalanceFailed();
+ assertEquals(1.0d,
metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
+ "Failed rebalance count should increment to 1 after recording
failure");
+
+ // Complete a successful rebalance
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ metricsManager.maybeRecordRebalanceFailed();
+ assertEquals(1.0d,
metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
+ "Failed count should not increment after successful rebalance
completes");
+
+ // Start another rebalance, don't complete it, then record failure
+ time.sleep(10);
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ assertTrue(metricsManager.rebalanceStarted(), "Rebalance should be in
progress");
+ time.sleep(10);
+ // Don't call recordRebalanceEnded() to simulate an incomplete
rebalance
+ metricsManager.maybeRecordRebalanceFailed();
+ assertEquals(2.0d,
metrics.metric(metricsManager.failedRebalanceTotal).metricValue());
+
+ double failedRate = (Double)
metrics.metric(metricsManager.failedRebalanceRate).metricValue();
+
+ // Calculate expected failed rate based on Rate metric behavior
+ // We had 2 failures over ~40ms, but minimum window is (numSamples -
1) * windowSizeMs
+ long minWindowMs = (numSamples - 1) * windowSizeMs; // (2-1) * 30000 =
30000ms
+ double expectedFailedRatePerHour = 2.0 / (minWindowMs / 1000.0) *
3600.0;
+
+ assertEquals(expectedFailedRatePerHour, failedRate, 1.0,
+ String.format("With 2 failures, min window %dms: expecting
%.1f failures/hour",
+ minWindowMs, expectedFailedRatePerHour));
+ }
+
+ @Test
+ public void testLastRebalanceSecondsAgoMetric() {
+
+ // Verify metric is registered
+ assertNotNull(metrics.metric(metricsManager.lastRebalanceSecondsAgo));
+
+ assertEquals(-1.0d,
metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(),
+ "Should return -1 when no rebalance has occurred");
+
+ // Complete a rebalance
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ assertEquals(0.0d,
metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(),
+ "Should return 0 immediately after rebalance completes");
+
+ // Advance time by 5 seconds
+ time.sleep(5000);
+ assertEquals(5.0d,
metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue());
+
+ // Advance time by another 10 seconds
+ time.sleep(10000);
+ assertEquals(15.0d,
metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue());
+
+ // Complete another rebalance
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(20);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ assertEquals(0.0d,
metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(),
+ "Should reset to 0 after a new rebalance completes");
+ }
+
+ @Test
+ public void testRebalanceStartedFlag() {
+
+ assertFalse(metricsManager.rebalanceStarted(),
+ "Initially, no rebalance should be in progress");
+
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ assertTrue(metricsManager.rebalanceStarted(),
+ "Rebalance should be marked as started after
recordRebalanceStarted()");
+
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ assertFalse(metricsManager.rebalanceStarted(),
+ "Rebalance should not be in progress after
recordRebalanceEnded()");
+
+ // Start another rebalance - advance time first
+ time.sleep(100);
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ assertTrue(metricsManager.rebalanceStarted(),
+ "New rebalance should be marked as started");
+ }
+
+ @Test
+ public void testMultipleConsecutiveFailures() {
+
+ // Record multiple consecutive failures
+ for (int i = 0; i < 5; i++) {
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.maybeRecordRebalanceFailed();
+ }
+
+ assertEquals(5.0d,
metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
+ "Should have recorded 5 consecutive failed rebalances");
+
+ assertEquals(0.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue(),
+ "Successful rebalance count should remain 0 when only failures
occur");
+ }
+
+ @Test
+ public void testMixedSuccessAndFailureScenarios() {
+
+ // Success -> Failure -> Success -> Failure pattern
+ // First success
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(20);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ assertEquals(1.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+ // First failure
+ time.sleep(10);
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ assertTrue(metricsManager.rebalanceStarted(), "First failure rebalance
should be in progress");
+ time.sleep(30);
+ metricsManager.maybeRecordRebalanceFailed();
+
+ double failedAfterFirst = (Double)
metrics.metric(metricsManager.failedRebalanceTotal).metricValue();
+ assertEquals(1.0d, failedAfterFirst, "Should have recorded one failed
rebalance after first failure");
+
+ // Second success
+ time.sleep(10);
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(40);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ assertEquals(2.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+ // Second failure
+ time.sleep(10);
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ assertTrue(metricsManager.rebalanceStarted(), "Second failure
rebalance should be in progress");
+ time.sleep(50);
+ metricsManager.maybeRecordRebalanceFailed();
+
+ assertEquals(2.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue(),
+ "Should have 2 successful rebalances in mixed scenario");
+ assertEquals(2.0d,
metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
+ "Should have 2 failed rebalances in mixed scenario");
+
+ assertEquals(30.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+ "Average latency should only include successful rebalances:
(20 + 40) / 2 = 30ms");
+ assertEquals(40.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+ "Max latency should be 40ms from successful rebalances only");
+ assertEquals(60.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+ "Total latency should only include successful rebalances: 20 +
40 = 60ms");
}
}