This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f45747d84e2 test: KafkaBoundedSupervisorTest flaky test (#19543)
f45747d84e2 is described below
commit f45747d84e262ca52b24d6c83ebf350cf4d6ab99
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Fri Jun 19 11:11:16 2026 +0530
test: KafkaBoundedSupervisorTest flaky test (#19543)
Changes:
- Add `LatchableEmitter.waitForEventAggregate` which waits for a specific
timeout
- Add `StreamIndexTestBase.waitUntilPublishedRecordsAreIngested` which
waits for a specific timeout
---
.../indexing/KafkaBoundedSupervisorTest.java | 5 +++--
.../embedded/indexing/StreamIndexTestBase.java | 25 +++++++++++++++++++++-
.../druid/server/metrics/LatchableEmitter.java | 14 +++++++++++-
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
index fa184418df5..ad3a3f3fff1 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
@@ -87,8 +87,9 @@ public class KafkaBoundedSupervisorTest extends
StreamIndexTestBase
cluster.callApi().postSupervisor(supervisor);
- // Wait for records to be ingested
- waitUntilPublishedRecordsAreIngested(totalRecords);
+ // Bounded supervisor cold start (post supervisor -> schedule task ->
consume -> publish) can exceed
+ // the cluster default wait on CI; give it a generous ceiling.
+ waitUntilPublishedRecordsAreIngested(totalRecords, 120_000L);
// Wait for supervisor to transition to COMPLETED state
waitForSupervisorToComplete(supervisor.getId());
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
index ee826936b2b..2c31e09284f 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
@@ -131,7 +131,7 @@ public abstract class StreamIndexTestBase extends
EmbeddedClusterTestBase
/**
* Waits until the total row count of successfully published segments matches
- * {@code expectedRowCount}.
+ * {@code expectedRowCount}, using the cluster default emitter timeout.
*/
protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
{
@@ -150,6 +150,29 @@ public abstract class StreamIndexTestBase extends
EmbeddedClusterTestBase
Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
}
+ /**
+ * Same as {@link #waitUntilPublishedRecordsAreIngested(int)} but with an
explicit timeout in millis.
+ * Use for ingestion paths with a heavier task lifecycle (e.g. bounded
supervisor cold start) where the
+ * cluster default may not allow enough headroom on CI.
+ */
+ protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount,
Long timeoutMillis)
+ {
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/rows/published")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(expectedRowCount),
+ timeoutMillis
+ );
+
+ final int totalEventsProcessed = indexer
+ .latchableEmitter()
+ .getMetricValues("ingest/rows/published",
Map.of(DruidMetrics.DATASOURCE, dataSource))
+ .stream()
+ .mapToInt(Number::intValue)
+ .sum();
+ Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
+ }
+
protected void verifySupervisorIsRunningHealthy(String supervisorId)
{
final SupervisorStatus status =
cluster.callApi().getSupervisorStatus(supervisorId);
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index e9779c1ddd6..fc4b40af6d6 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -202,6 +202,18 @@ public class LatchableEmitter extends StubServiceEmitter
UnaryOperator<EventMatcher> condition,
UnaryOperator<AggregateMatcher> aggregateCondition
)
+ {
+ waitForEventAggregate(condition, aggregateCondition,
defaultWaitTimeoutMillis);
+ }
+
+ /**
+ * Same as {@link #waitForEventAggregate(UnaryOperator, UnaryOperator)} but
with an explicit timeout.
+ */
+ public void waitForEventAggregate(
+ UnaryOperator<EventMatcher> condition,
+ UnaryOperator<AggregateMatcher> aggregateCondition,
+ long timeoutMillis
+ )
{
final EventMatcher eventMatcher = condition.apply(new EventMatcher());
final AggregateMatcher aggregateMatcher = aggregateCondition.apply(new
AggregateMatcher());
@@ -210,7 +222,7 @@ public class LatchableEmitter extends StubServiceEmitter
event -> event instanceof ServiceMetricEvent
&& eventMatcher.test((ServiceMetricEvent) event)
&& aggregateMatcher.test((ServiceMetricEvent) event),
- defaultWaitTimeoutMillis
+ timeoutMillis
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]