This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f45747d84e2 test: KafkaBoundedSupervisorTest flaky test (#19543)
f45747d84e2 is described below

commit f45747d84e262ca52b24d6c83ebf350cf4d6ab99
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Fri Jun 19 11:11:16 2026 +0530

    test: KafkaBoundedSupervisorTest flaky test (#19543)
    
    Changes:
    - Add `LatchableEmitter.waitForEventAggregate` which waits for a specific 
timeout
    - Add `StreamIndexTestBase.waitUntilPublishedRecordsAreIngested` which 
waits for a specific timeout
---
 .../indexing/KafkaBoundedSupervisorTest.java       |  5 +++--
 .../embedded/indexing/StreamIndexTestBase.java     | 25 +++++++++++++++++++++-
 .../druid/server/metrics/LatchableEmitter.java     | 14 +++++++++++-
 3 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
index fa184418df5..ad3a3f3fff1 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
@@ -87,8 +87,9 @@ public class KafkaBoundedSupervisorTest extends 
StreamIndexTestBase
 
     cluster.callApi().postSupervisor(supervisor);
 
-    // Wait for records to be ingested
-    waitUntilPublishedRecordsAreIngested(totalRecords);
+    // Bounded supervisor cold start (post supervisor -> schedule task -> 
consume -> publish) can exceed
+    // the cluster default wait on CI; give it a generous ceiling.
+    waitUntilPublishedRecordsAreIngested(totalRecords, 120_000L);
 
     // Wait for supervisor to transition to COMPLETED state
     waitForSupervisorToComplete(supervisor.getId());
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
index ee826936b2b..2c31e09284f 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
@@ -131,7 +131,7 @@ public abstract class StreamIndexTestBase extends 
EmbeddedClusterTestBase
 
   /**
    * Waits until the total row count of successfully published segments matches
-   * {@code expectedRowCount}.
+   * {@code expectedRowCount}, using the cluster default emitter timeout.
    */
   protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
   {
@@ -150,6 +150,29 @@ public abstract class StreamIndexTestBase extends 
EmbeddedClusterTestBase
     Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
   }
 
+  /**
+   * Same as {@link #waitUntilPublishedRecordsAreIngested(int)} but with an 
explicit timeout in millis.
+   * Use for ingestion paths with a heavier task lifecycle (e.g. bounded 
supervisor cold start) where the
+   * cluster default may not allow enough headroom on CI.
+   */
+  protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount, 
Long timeoutMillis)
+  {
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/rows/published")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(expectedRowCount),
+        timeoutMillis
+    );
+
+    final int totalEventsProcessed = indexer
+        .latchableEmitter()
+        .getMetricValues("ingest/rows/published", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
+        .stream()
+        .mapToInt(Number::intValue)
+        .sum();
+    Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
+  }
+
   protected void verifySupervisorIsRunningHealthy(String supervisorId)
   {
     final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisorId);
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index e9779c1ddd6..fc4b40af6d6 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -202,6 +202,18 @@ public class LatchableEmitter extends StubServiceEmitter
       UnaryOperator<EventMatcher> condition,
       UnaryOperator<AggregateMatcher> aggregateCondition
   )
+  {
+    waitForEventAggregate(condition, aggregateCondition, 
defaultWaitTimeoutMillis);
+  }
+
+  /**
+   * Same as {@link #waitForEventAggregate(UnaryOperator, UnaryOperator)} but 
with an explicit timeout.
+   */
+  public void waitForEventAggregate(
+      UnaryOperator<EventMatcher> condition,
+      UnaryOperator<AggregateMatcher> aggregateCondition,
+      long timeoutMillis
+  )
   {
     final EventMatcher eventMatcher = condition.apply(new EventMatcher());
     final AggregateMatcher aggregateMatcher = aggregateCondition.apply(new 
AggregateMatcher());
@@ -210,7 +222,7 @@ public class LatchableEmitter extends StubServiceEmitter
         event -> event instanceof ServiceMetricEvent
                  && eventMatcher.test((ServiceMetricEvent) event)
                  && aggregateMatcher.test((ServiceMetricEvent) event),
-        defaultWaitTimeoutMillis
+        timeoutMillis
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to