jtuglu1 commented on code in PR #19543:
URL: https://github.com/apache/druid/pull/19543#discussion_r3418762888


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java:
##########
@@ -131,15 +131,34 @@ protected KinesisSupervisorSpec 
createKinesisSupervisor(KinesisResource kinesis,
 
   /**
    * Waits until the total row count of successfully published segments matches
-   * {@code expectedRowCount}.
+   * {@code expectedRowCount}, using the cluster default emitter timeout.
    */
   protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
   {
-    indexer.latchableEmitter().waitForEventAggregate(
-        event -> event.hasMetricName("ingest/rows/published")
-                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
-        agg -> agg.hasSumAtLeast(expectedRowCount)
-    );
+    waitUntilPublishedRecordsAreIngested(expectedRowCount, null);
+  }
+
+  /**
+   * Same as {@link #waitUntilPublishedRecordsAreIngested(int)} but with an 
explicit timeout in millis.
+   * Use for ingestion paths with a heavier task lifecycle (e.g. bounded 
supervisor cold start) where the
+   * cluster default may not allow enough headroom on CI.
+   */
+  protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount, 
Long timeoutMillis)
+  {
+    if (timeoutMillis == null) {

Review Comment:
   Let's mark the `timeoutMillis` as @Nullable and then I'm fine with this 
change unless @kfaraz has objections



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to