Shekharrajak commented on code in PR #19594:
URL: https://github.com/apache/druid/pull/19594#discussion_r3430365526


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void 
test_autoScaler_scalesUpAndDown_withSlowPublish()
         .build(dataSource, topic);
     cluster.callApi().postSupervisor(supervisor);
 
-    // Ingest a large number of records to trigger a scale-up
-    // 10k records = 100 segments to publish * 100 rows per segment
-    int totalRecords = 0;
-    for (int i = 0; i < 10; ++i) {
-      totalRecords += publish1kRecords(topic, false);
-    }
+    overlord.latchableEmitter()
+            .waitForEvent(event -> event.hasMetricName("task/run/time")
+                                        .hasDimension(DruidMetrics.DATASOURCE, 
dataSource));
 
-    // Wait for tasks to scale up
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.greaterThan(1L))
-    );
-    Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
-    waitUntilPublishedRecordsAreIngested(totalRecords);
+    final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+    final AtomicInteger totalRecords = new AtomicInteger();
+    final ExecutorService publisher = Executors.newSingleThreadExecutor();
+    final Future<?> publisherFuture = publisher.submit(() -> {
+      for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get(); 
++i) {
+        totalRecords.addAndGet(publish1kRecords(topic, false));
+        try {
+          TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    });
 
-    // Let the tasks work through the lag.
-    // Do not publish any more records so that the idleness causes scale-down
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.equalTo(1L))
-    );
-    Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+    try {
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName("task/autoScaler/updatedCount")
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+      keepPublishing.set(false);
+      publisherFuture.get(30, TimeUnit.SECONDS);
+      ITRetryUtil.retryUntilTrue(
+          () -> getCurrentTaskCount(supervisor.getId()) > 1,
+          "supervisor task count to scale up"
+      );
+      waitUntilPublishedRecordsAreIngested(totalRecords.get());
+
+      // Let the tasks work through the lag.
+      // Do not publish any more records so that the idleness causes scale-down
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName("task/autoScaler/updatedCount")
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.equalTo(1L))
+      );
+      ITRetryUtil.retryUntilEquals(
+          () -> getCurrentTaskCount(supervisor.getId()),
+          1,
+          "supervisor task count to scale down"
+      );
+    }
+    finally {
+      keepPublishing.set(false);
+      publisher.shutdownNow();
+      cluster.callApi().postSupervisor(supervisor.createSuspendedSpec());
+    }
 
-    cluster.callApi().postSupervisor(supervisor.createSuspendedSpec());
     cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
-    Assertions.assertEquals("10000", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));

Review Comment:
   Assert final row count from the actual number of published records.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void 
test_autoScaler_scalesUpAndDown_withSlowPublish()
         .build(dataSource, topic);
     cluster.callApi().postSupervisor(supervisor);
 
-    // Ingest a large number of records to trigger a scale-up
-    // 10k records = 100 segments to publish * 100 rows per segment
-    int totalRecords = 0;
-    for (int i = 0; i < 10; ++i) {
-      totalRecords += publish1kRecords(topic, false);
-    }
+    overlord.latchableEmitter()
+            .waitForEvent(event -> event.hasMetricName("task/run/time")
+                                        .hasDimension(DruidMetrics.DATASOURCE, 
dataSource));
 
-    // Wait for tasks to scale up
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.greaterThan(1L))
-    );
-    Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
-    waitUntilPublishedRecordsAreIngested(totalRecords);
+    final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+    final AtomicInteger totalRecords = new AtomicInteger();
+    final ExecutorService publisher = Executors.newSingleThreadExecutor();
+    final Future<?> publisherFuture = publisher.submit(() -> {
+      for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get(); 
++i) {
+        totalRecords.addAndGet(publish1kRecords(topic, false));
+        try {
+          TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    });
 
-    // Let the tasks work through the lag.
-    // Do not publish any more records so that the idleness causes scale-down
-    overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName("task/autoScaler/updatedCount")
-                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
-                      .hasValueMatching(Matchers.equalTo(1L))
-    );
-    Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+    try {
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName("task/autoScaler/updatedCount")
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.greaterThan(1L))
+      );
+      keepPublishing.set(false);
+      publisherFuture.get(30, TimeUnit.SECONDS);
+      ITRetryUtil.retryUntilTrue(
+          () -> getCurrentTaskCount(supervisor.getId()) > 1,
+          "supervisor task count to scale up"
+      );
+      waitUntilPublishedRecordsAreIngested(totalRecords.get());
+
+      // Let the tasks work through the lag.
+      // Do not publish any more records so that the idleness causes scale-down
+      overlord.latchableEmitter().waitForEvent(
+          event -> event.hasMetricName("task/autoScaler/updatedCount")
+                        .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                        .hasValueMatching(Matchers.equalTo(1L))
+      );
+      ITRetryUtil.retryUntilEquals(
+          () -> getCurrentTaskCount(supervisor.getId()),
+          1,
+          "supervisor task count to scale down"
+      );
+    }
+    finally {
+      keepPublishing.set(false);

Review Comment:
   Clean up publisher and suspend supervisor 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to