FrankChen021 commented on code in PR #19594:
URL: https://github.com/apache/druid/pull/19594#discussion_r3435734643


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void 
test_autoScaler_scalesUpAndDown_withSlowPublish()
         .build(dataSource, topic);
     cluster.callApi().postSupervisor(supervisor);
 
-    // Ingest a large number of records to trigger a scale-up
-    // 10k records = 100 segments to publish * 100 rows per segment
-    int totalRecords = 0;
-    for (int i = 0; i < 10; ++i) {
-      totalRecords += publish1kRecords(topic, false);
-    }
+    overlord.latchableEmitter()
+            .waitForEvent(event -> event.hasMetricName("task/run/time")
+                                        .hasDimension(DruidMetrics.DATASOURCE, 
dataSource));
 
-    // Wait for tasks to scale up
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.greaterThan(1L))
-    );
-    Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
-    waitUntilPublishedRecordsAreIngested(totalRecords);
+    final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+    final AtomicInteger totalRecords = new AtomicInteger();
+    final ExecutorService publisher = Executors.newSingleThreadExecutor();
+    final Future<?> publisherFuture = publisher.submit(() -> {
+      for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get(); 
++i) {
+        totalRecords.addAndGet(publish1kRecords(topic, false));
+        try {
+          TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    });
 
-    // Let the tasks work through the lag.
-    // Do not publish any more records so that the idleness causes scale-down
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.equalTo(1L))
-    );
-    Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+    try {
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName("task/autoScaler/updatedCount")
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+      keepPublishing.set(false);
+      publisherFuture.get(30, TimeUnit.SECONDS);
+      ITRetryUtil.retryUntilTrue(

Review Comment:
   [P2] Avoid unbounded-length retries in this integration test
   
   ITRetryUtil.retryUntilTrue uses the default 240 retries with 5 seconds 
between attempts, so this newly added check can add up to 20 minutes to a 
failing run, and the method itself has no @Timeout. Since this test already 
uses 600-second latch waits, a regression can now occupy CI for much longer 
before failing. Use a bounded retry tuned for this test or add an explicit 
method timeout.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void 
test_autoScaler_scalesUpAndDown_withSlowPublish()
         .build(dataSource, topic);
     cluster.callApi().postSupervisor(supervisor);
 
-    // Ingest a large number of records to trigger a scale-up
-    // 10k records = 100 segments to publish * 100 rows per segment
-    int totalRecords = 0;
-    for (int i = 0; i < 10; ++i) {
-      totalRecords += publish1kRecords(topic, false);
-    }
+    overlord.latchableEmitter()
+            .waitForEvent(event -> event.hasMetricName("task/run/time")
+                                        .hasDimension(DruidMetrics.DATASOURCE, 
dataSource));
 
-    // Wait for tasks to scale up
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.greaterThan(1L))
-    );
-    Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
-    waitUntilPublishedRecordsAreIngested(totalRecords);
+    final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+    final AtomicInteger totalRecords = new AtomicInteger();
+    final ExecutorService publisher = Executors.newSingleThreadExecutor();
+    final Future<?> publisherFuture = publisher.submit(() -> {
+      for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get(); 
++i) {
+        totalRecords.addAndGet(publish1kRecords(topic, false));
+        try {
+          TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    });
 
-    // Let the tasks work through the lag.
-    // Do not publish any more records so that the idleness causes scale-down
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.equalTo(1L))
-    );
-    Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+    try {
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName("task/autoScaler/updatedCount")
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+      keepPublishing.set(false);
+      publisherFuture.get(30, TimeUnit.SECONDS);

Review Comment:
   [P2] Surface publisher failures before waiting for scaler metrics
   
   The background publisher future is only observed after both autoscaler 
metric waits succeed. If publish1kRecords throws before creating enough lag, 
the test now waits for scaler metrics that may never arrive and then exits 
through finally without ever calling publisherFuture.get(), masking the real 
producer failure. The previous synchronous publish path failed immediately. 
Check the future while waiting, or observe it in the failure path so producer 
exceptions fail the test directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to