This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c47d68ae4b5 KAFKA-20296: Fix flaky consumer metrics test (#21713)
c47d68ae4b5 is described below
commit c47d68ae4b5eee38082a97d85cdc0a20c771c89f
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Mar 12 12:12:21 2026 -0400
KAFKA-20296: Fix flaky consumer metrics test (#21713)
It has been flaky on trunk and PRs, failing when it continues to find a
metric that is supposed to the removed when preparing fetch requests.
The metric is only removed when sending new fetch requests (on
prepareFetchRequests), but the test was just waiting for a rebalance
(callbacks triggered) , which really does not guarantee that we prepared
a new fetch request.
Reviewers: Andrew Schofield <[email protected]>, Nilesh Kumar
<[email protected]>
---
.../clients/consumer/PlaintextConsumerTest.java | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index 52ae32c6b30..4dd432859e9 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -889,13 +889,9 @@ public class PlaintextConsumerTest {
assertNotNull(fetchLead0);
assertEquals((double) records.count(), fetchLead0.metricValue(),
"The lead should be " + records.count());
- // Remove topic from subscription
+ // Remove topic from subscription and wait for metrics cleanup.
consumer.subscribe(List.of(topic2), listener);
- awaitRebalance(consumer, listener);
-
- // Verify the metric has gone
- assertNull(consumer.metrics().get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags1)));
- assertNull(consumer.metrics().get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags2)));
+ awaitMetricsCleanup(consumer, "records-lead", tags1, tags2);
}
}
@@ -957,13 +953,9 @@ public class PlaintextConsumerTest {
var expectedLag = numMessages - records.count();
assertEquals(expectedLag, (double) fetchLag0.metricValue(),
EPSILON, "The lag should be " + expectedLag);
- // Remove topic from subscription
+ // Remove topic from subscription and wait for metrics cleanup.
consumer.subscribe(List.of(topic2), listener);
- awaitRebalance(consumer, listener);
-
- // Verify the metric has gone
- assertNull(consumer.metrics().get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags1)));
- assertNull(consumer.metrics().get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags2)));
+ awaitMetricsCleanup(consumer, "records-lag", tags1, tags2);
}
}
@@ -1751,6 +1743,20 @@ public class PlaintextConsumerTest {
return result.get();
}
+ private void awaitMetricsCleanup(
+ Consumer<?, ?> consumer,
+ String metricName,
+ Map<String, String> tags1,
+ Map<String, String> tags2
+ ) throws InterruptedException {
+ var metric1 = new MetricName(metricName,
"consumer-fetch-manager-metrics", "", tags1);
+ var metric2 = new MetricName(metricName,
"consumer-fetch-manager-metrics", "", tags2);
+ TestUtils.waitForCondition(() -> {
+ consumer.poll(Duration.ofMillis(100));
+ return consumer.metrics().get(metric1) == null &&
consumer.metrics().get(metric2) == null;
+ }, "Metrics for removed partitions should be cleaned up");
+ }
+
public static class SerializerImpl implements Serializer<byte[]> {
private final ByteArraySerializer serializer = new
ByteArraySerializer();