This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2bae9c36f29 KAFKA-18952 Fix flaky test in 
MonitorableSinkIntegrationTest (#21057)
2bae9c36f29 is described below

commit 2bae9c36f29b5c4c4033f2d0fe8697e7fa69d162
Author: majialong <[email protected]>
AuthorDate: Sat Jan 3 23:24:49 2026 +0800

    KAFKA-18952 Fix flaky test in MonitorableSinkIntegrationTest (#21057)
    
    The test `testMonitorableSinkConnectorAndTask` is flaky due to a race
    condition between the task thread and the test thread.
    
    The `awaitRecords()` method uses a `CountDownLatch` that counts down in
    `TestableSinkTask.put()` for each record inside the loop, while
    `MonitorableSinkTask.count` is updated after `super.put()` returns. When
    the latch reaches zero, `awaitRecords()` returns immediately, but the
    `count += records.size() `may not have executed yet.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/MonitorableSinkIntegrationTest.java | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
index c3720690268..e9d74836e01 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
@@ -58,6 +58,7 @@ public class MonitorableSinkIntegrationTest {
     private static final int NUM_TASKS = 1;
     private static final long CONNECTOR_SETUP_DURATION_MS = 
TimeUnit.SECONDS.toMillis(60);
     private static final long CONSUME_MAX_DURATION_MS = 
TimeUnit.SECONDS.toMillis(30);
+    private static final long METRICS_CONVERGENCE_DURATION_MS = 
TimeUnit.SECONDS.toMillis(5);
 
     private EmbeddedConnectStandalone connect;
     private ConnectorHandle connectorHandle;
@@ -119,11 +120,19 @@ public class MonitorableSinkIntegrationTest {
 
         // check task metric
         metrics = connect.connectMetrics().metrics().metrics();
-        MetricName taskMetric = 
MonitorableSinkConnector.MonitorableSinkTask.metricsName;
-        assertTrue(metrics.containsKey(taskMetric));
-        assertEquals(CONNECTOR_NAME, taskMetric.tags().get("connector"));
-        assertEquals("0", taskMetric.tags().get("task"));
-        assertEquals((double) NUM_RECORDS_PRODUCED, 
metrics.get(taskMetric).metricValue());
+        MetricName taskMetricName = 
MonitorableSinkConnector.MonitorableSinkTask.metricsName;
+        assertTrue(metrics.containsKey(taskMetricName));
+        assertEquals(CONNECTOR_NAME, taskMetricName.tags().get("connector"));
+        assertEquals("0", taskMetricName.tags().get("task"));
+
+        KafkaMetric taskMetric = metrics.get(taskMetricName);
+        // The metric value may not be updated immediately after 
awaitRecords() returns,
+        // because MonitorableSinkTask.count is incremented after 
TestableSinkTask.put()
+        // which triggers the latch countdown. Use waitForCondition to handle 
this race condition.
+        waitForCondition(
+                () -> (double) NUM_RECORDS_PRODUCED == (double) 
taskMetric.metricValue(),
+                METRICS_CONVERGENCE_DURATION_MS,
+                "Task metric did not converge to expected value in time.");
 
         connect.deleteConnector(CONNECTOR_NAME);
         connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
@@ -132,7 +141,7 @@ public class MonitorableSinkIntegrationTest {
         // verify connector and task metrics have been deleted
         metrics = connect.connectMetrics().metrics().metrics();
         assertFalse(metrics.containsKey(connectorMetric));
-        assertFalse(metrics.containsKey(taskMetric));
+        assertFalse(metrics.containsKey(taskMetricName));
     }
 
     /**

Reply via email to