This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c47d68ae4b5 KAFKA-20296: Fix flaky consumer metrics test (#21713)
c47d68ae4b5 is described below

commit c47d68ae4b5eee38082a97d85cdc0a20c771c89f
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Mar 12 12:12:21 2026 -0400

    KAFKA-20296: Fix flaky consumer metrics test (#21713)
    
    It has been flaky on trunk and PRs, failing when it continues to find a
    metric that is supposed to the removed when preparing fetch requests.
    
    The metric is only removed when sending new fetch requests (on
    prepareFetchRequests), but the test was just waiting for a rebalance
    (callbacks triggered) , which really does not guarantee that we prepared
    a new fetch request.
    
    Reviewers: Andrew Schofield <[email protected]>, Nilesh Kumar
     <[email protected]>
---
 .../clients/consumer/PlaintextConsumerTest.java    | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index 52ae32c6b30..4dd432859e9 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -889,13 +889,9 @@ public class PlaintextConsumerTest {
             assertNotNull(fetchLead0);
             assertEquals((double) records.count(), fetchLead0.metricValue(), 
"The lead should be " + records.count());
 
-            // Remove topic from subscription
+            // Remove topic from subscription and wait for metrics cleanup.
             consumer.subscribe(List.of(topic2), listener);
-            awaitRebalance(consumer, listener);
-
-            // Verify the metric has gone
-            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags1)));
-            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags2)));
+            awaitMetricsCleanup(consumer, "records-lead", tags1, tags2);
         }
     }
 
@@ -957,13 +953,9 @@ public class PlaintextConsumerTest {
             var expectedLag = numMessages - records.count();
             assertEquals(expectedLag, (double) fetchLag0.metricValue(), 
EPSILON, "The lag should be " + expectedLag);
 
-            // Remove topic from subscription
+            // Remove topic from subscription and wait for metrics cleanup.
             consumer.subscribe(List.of(topic2), listener);
-            awaitRebalance(consumer, listener);
-
-            // Verify the metric has gone
-            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags1)));
-            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags2)));
+            awaitMetricsCleanup(consumer, "records-lag", tags1, tags2);
         }
     }
 
@@ -1751,6 +1743,20 @@ public class PlaintextConsumerTest {
         return result.get();
     }
 
+    private void awaitMetricsCleanup(
+        Consumer<?, ?> consumer,
+        String metricName,
+        Map<String, String> tags1,
+        Map<String, String> tags2
+    ) throws InterruptedException {
+        var metric1 = new MetricName(metricName, 
"consumer-fetch-manager-metrics", "", tags1);
+        var metric2 = new MetricName(metricName, 
"consumer-fetch-manager-metrics", "", tags2);
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100));
+            return consumer.metrics().get(metric1) == null && 
consumer.metrics().get(metric2) == null;
+        }, "Metrics for removed partitions should be cleaned up");
+    }
+
     public static class SerializerImpl implements Serializer<byte[]> {
         private final ByteArraySerializer serializer = new 
ByteArraySerializer();
 

Reply via email to