This is an automated email from the ASF dual-hosted git repository. Fly-Style pushed a commit to branch feat/poll-idle-ratio-replacement in repository https://gitbox.apache.org/repos/asf/druid.git
commit c16aee0c797b3042585ea9870c4178f623dca8c7 Author: Sasha Syrotenko <[email protected]> AuthorDate: Wed Jun 24 16:12:05 2026 +0300 Cleanup --- .../autoscaler/CostBasedAutoScalerIntegrationTest.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java index 0c72d07d8e6..af79388f30a 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java @@ -38,6 +38,7 @@ import org.hamcrest.Matchers; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Duration; import org.joda.time.Period; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,6 +47,7 @@ import org.junit.jupiter.api.Timeout; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.OPTIMAL_TASK_COUNT_METRIC; @@ -62,6 +64,7 @@ public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase private String topic; private final KafkaResource kafkaServer = new KafkaResource(); + private ExecutorService backgroundPublishExecutor; @Override protected StreamIngestResource<?> getStreamIngestResource() @@ -84,6 +87,14 @@ public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase kafkaServer.createTopicWithPartitions(topic, PARTITION_COUNT); } + @AfterEach + public void shutdownBackgroundPublishExecutor() + { + if (backgroundPublishExecutor != null) { + backgroundPublishExecutor.shutdownNow(); + } + } + @Test @Timeout(45) public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown() @@ -137,7 +148,8 @@ public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase // Produce additional records to create a backlog / lag // This ensures tasks are busy processing (low idle ratio) - Executors.newSingleThreadExecutor().submit(() -> { + backgroundPublishExecutor = Executors.newSingleThreadExecutor(); + backgroundPublishExecutor.submit(() -> { for (int i = 0; i < 500; ++i) { publish1kRecords(topic, true); } @@ -186,7 +198,8 @@ public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase final int lowInitialTaskCount = 1; // This ensures tasks are busy processing (low idle ratio) - Executors.newSingleThreadExecutor().submit(() -> { + backgroundPublishExecutor = Executors.newSingleThreadExecutor(); + backgroundPublishExecutor.submit(() -> { for (int i = 0; i < 500; ++i) { publish1kRecords(topic, true); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
