kfaraz commented on code in PR #19543:
URL: https://github.com/apache/druid/pull/19543#discussion_r3339361581
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java:
##########
@@ -131,15 +131,34 @@ protected KinesisSupervisorSpec
createKinesisSupervisor(KinesisResource kinesis,
/**
* Waits until the total row count of successfully published segments matches
- * {@code expectedRowCount}.
+ * {@code expectedRowCount}, using the cluster default emitter timeout.
*/
protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
{
- indexer.latchableEmitter().waitForEventAggregate(
- event -> event.hasMetricName("ingest/rows/published")
- .hasDimension(DruidMetrics.DATASOURCE, dataSource),
- agg -> agg.hasSumAtLeast(expectedRowCount)
- );
+ waitUntilPublishedRecordsAreIngested(expectedRowCount, null);
+ }
+
+ /**
+ * Same as {@link #waitUntilPublishedRecordsAreIngested(int)} but with an
explicit timeout in millis.
+ * Use for ingestion paths with a heavier task lifecycle (e.g. bounded
supervisor cold start) where the
+ * cluster default may not allow enough headroom on CI.
+ */
+ protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount,
Long timeoutMillis)
+ {
+ if (timeoutMillis == null) {
Review Comment:
Instead of making the timeout nullable in this method and then adding an
if-else anyway, do the following:
- Leave the original method unchanged.
- Add a new method which accepts a primitive `long timeout`, which must
always be non-null and greater than 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]