This is an automated email from the ASF dual-hosted git repository.

abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ce8a309071 test: Fix row-count race in EmbeddedKafkaSupervisorTest 
(#19581)
1ce8a309071 is described below

commit 1ce8a30907115327ea6df27b700d21d214d22460
Author: Andreas Maechler <[email protected]>
AuthorDate: Mon Jun 15 11:22:18 2026 -0600

    test: Fix row-count race in EmbeddedKafkaSupervisorTest (#19581)
    
    test_runKafkaSupervisor produced 10 records, waited only for the broker to
    discover the datasource (the first matching metric event), then immediately
    asserted SELECT COUNT(*) == 10. Under a loaded CI runner the query raced
    ingestion and saw fewer than 10 rows (expected:<10> but was:<8/7/9>).
    
    Wait for ingest/events/processed to reach the expected count before 
querying,
    matching the sibling test_runSupervisor_withEmptyDimension, and derive the
    expected count from expectedSegments instead of a hardcoded literal.
---
 .../kafka/simulate/EmbeddedKafkaSupervisorTest.java         | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index 6f0fa52317d..9bfa4cf1607 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -125,8 +125,19 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     Assertions.assertEquals(1, taskStatuses.size());
     Assertions.assertEquals(TaskState.RUNNING, 
taskStatuses.get(0).getStatusCode());
 
+    // Wait until all produced records have been ingested before verifying the 
row count,
+    // otherwise the query below can race ingestion and observe fewer than the 
expected rows
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/events/processed")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(expectedSegments)
+    );
+
     // Verify the count of rows ingested into the datasource so far
-    Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
+    Assertions.assertEquals(
+        String.valueOf(expectedSegments),
+        cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
+    );
 
     // Suspend the supervisor and verify the state
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to