This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2bae9c36f29 KAFKA-18952 Fix flaky test in
MonitorableSinkIntegrationTest (#21057)
2bae9c36f29 is described below
commit 2bae9c36f29b5c4c4033f2d0fe8697e7fa69d162
Author: majialong <[email protected]>
AuthorDate: Sat Jan 3 23:24:49 2026 +0800
KAFKA-18952 Fix flaky test in MonitorableSinkIntegrationTest (#21057)
The test `testMonitorableSinkConnectorAndTask` is flaky due to a race
condition between the task thread and the test thread.
The `awaitRecords()` method uses a `CountDownLatch` that counts down in
`TestableSinkTask.put()` for each record inside the loop, while
`MonitorableSinkTask.count` is updated after `super.put()` returns. When
the latch reaches zero, `awaitRecords()` returns immediately, but the
`count += records.size() `may not have executed yet.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/MonitorableSinkIntegrationTest.java | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
index c3720690268..e9d74836e01 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java
@@ -58,6 +58,7 @@ public class MonitorableSinkIntegrationTest {
private static final int NUM_TASKS = 1;
private static final long CONNECTOR_SETUP_DURATION_MS =
TimeUnit.SECONDS.toMillis(60);
private static final long CONSUME_MAX_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
+ private static final long METRICS_CONVERGENCE_DURATION_MS =
TimeUnit.SECONDS.toMillis(5);
private EmbeddedConnectStandalone connect;
private ConnectorHandle connectorHandle;
@@ -119,11 +120,19 @@ public class MonitorableSinkIntegrationTest {
// check task metric
metrics = connect.connectMetrics().metrics().metrics();
- MetricName taskMetric =
MonitorableSinkConnector.MonitorableSinkTask.metricsName;
- assertTrue(metrics.containsKey(taskMetric));
- assertEquals(CONNECTOR_NAME, taskMetric.tags().get("connector"));
- assertEquals("0", taskMetric.tags().get("task"));
- assertEquals((double) NUM_RECORDS_PRODUCED,
metrics.get(taskMetric).metricValue());
+ MetricName taskMetricName =
MonitorableSinkConnector.MonitorableSinkTask.metricsName;
+ assertTrue(metrics.containsKey(taskMetricName));
+ assertEquals(CONNECTOR_NAME, taskMetricName.tags().get("connector"));
+ assertEquals("0", taskMetricName.tags().get("task"));
+
+ KafkaMetric taskMetric = metrics.get(taskMetricName);
+ // The metric value may not be updated immediately after
awaitRecords() returns,
+ // because MonitorableSinkTask.count is incremented after
TestableSinkTask.put()
+ // which triggers the latch countdown. Use waitForCondition to handle
this race condition.
+ waitForCondition(
+ () -> (double) NUM_RECORDS_PRODUCED == (double)
taskMetric.metricValue(),
+ METRICS_CONVERGENCE_DURATION_MS,
+ "Task metric did not converge to expected value in time.");
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
@@ -132,7 +141,7 @@ public class MonitorableSinkIntegrationTest {
// verify connector and task metrics have been deleted
metrics = connect.connectMetrics().metrics().metrics();
assertFalse(metrics.containsKey(connectorMetric));
- assertFalse(metrics.containsKey(taskMetric));
+ assertFalse(metrics.containsKey(taskMetricName));
}
/**