This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d8fec81c29 Embedded tests for MSQ realtime queries. (#18201)
2d8fec81c29 is described below

commit 2d8fec81c29dd9c855d1d1cff2ddf9111a9c476a
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Jul 6 22:45:30 2025 -0700

    Embedded tests for MSQ realtime queries. (#18201)
    
    * Embedded tests for MSQ realtime queries.
    
    This patch adds an `EmbeddedMSQRealtimeQueryTest`, and `EmbeddedMSQApis` to
    support it.
    
    This patch also moves various usages of RuntimeInfo to regular injection
    rather than using a static function, and creates a RuntimeInfoModule that is
    part of the embedded tests and delivers a normal, non-statically-injected
    RuntimeInfo.
    
    This was necessary because the MSQ tasks, as they start up within the
    multi-task Indexer, need to see more than 100 MB of total Indexer memory
    in order to believe that there is sufficient memory to move forwards.
    Static injection of RuntimeInfo makes this difficult, since the various
    servers end up clobbering each other, and it isn't guaranteed that the
    correct RuntimeInfo will be used.
    
    * Comments.
    
    * Add missing dependencies.
    
    * Revert "Add missing dependencies."
    
    This reverts commit 07520327b8b8a5f83c7baeb2368119ebed2397c1.
    
    * Test fixes.
    
    * Fix NPE in metrics emission.
    
    * Remove @Disabled from tests that now pass.
---
 embedded-tests/pom.xml                             |  37 ++
 .../testing/embedded/msq/EmbeddedMSQApis.java      | 133 ++++++++
 .../embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 380 +++++++++++++++++++++
 .../indexing/kafka/simulate/KafkaResource.java     |   6 +
 .../lookup/namespace/JdbcCacheGenerator.java       |  13 +-
 .../server/lookup/namespace/UriCacheGenerator.java |  10 +-
 .../lookup/namespace/JdbcCacheGeneratorTest.java   |   3 +-
 .../namespace/NamespacedExtractorModuleTest.java   |   9 +-
 .../lookup/namespace/UriCacheGeneratorTest.java    |   5 +-
 .../lookup/namespace/cache/CacheSchedulerTest.java |   3 +-
 .../cache/JdbcExtractionNamespaceTest.java         |   3 +-
 .../druid/msq/dart/guice/DartControllerConfig.java |  15 +
 .../DartControllerMemoryManagementModule.java      |  13 +-
 .../guice/DartWorkerMemoryManagementModule.java    |   7 +-
 .../druid/msq/exec/MSQMetriceEventBuilder.java     |  11 +-
 .../msq/guice/IndexerMemoryManagementModule.java   |  10 +-
 .../msq/guice/PeonMemoryManagementModule.java      |   7 +-
 .../apache/druid/guice/PeonProcessingModule.java   |  13 +-
 .../apache/druid/indexing/common/TaskToolbox.java  |  19 +-
 .../druid/indexing/common/TaskToolboxFactory.java  |   7 +-
 .../druid/indexing/common/TaskToolboxTest.java     |   4 +-
 .../indexing/common/task/IngestionTestBase.java    |   3 +
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   4 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   4 +-
 .../indexing/overlord/TestTaskToolboxFactory.java  |  11 +-
 .../SeekableStreamAppenderatorConfigTest.java      |   4 +-
 .../SeekableStreamIndexTaskTestBase.java           |   4 +-
 .../indexing/worker/WorkerTaskManagerTest.java     |   4 +-
 .../indexing/worker/WorkerTaskMonitorTest.java     |   4 +-
 .../org/apache/druid/guice/RuntimeInfoModule.java  |  33 --
 .../apache/druid/guice/StartupInjectorBuilder.java |   5 +-
 .../apache/druid/query/DruidProcessingConfig.java  |  15 +-
 .../main/java/org/apache/druid/utils/JvmUtils.java |  14 +-
 .../core/ParametrizedUriEmitterConfigTest.java     |  14 -
 .../druid/query/DruidProcessingConfigTest.java     |  26 +-
 .../apache/druid/client/broker/BrokerClient.java   |   4 +-
 .../apache/druid/guice/BrokerProcessingModule.java |  14 +-
 .../apache/druid/guice/DruidProcessingModule.java  |  28 +-
 .../druid/rpc/indexing/NoopOverlordClient.java     |   6 +
 .../apache/druid/rpc/indexing/OverlordClient.java  |   9 +
 .../druid/rpc/indexing/OverlordClientImpl.java     |  17 +
 .../org/apache/druid/server/StatusResource.java    |  13 +-
 .../server/SubqueryGuardrailHelperProvider.java    |   9 +-
 .../druid/guice/BrokerProcessingModuleTest.java    |   6 +-
 .../druid/guice/DruidProcessingModuleTest.java     |  31 +-
 .../apache/druid/server/StatusResourceTest.java    |   4 +-
 .../druid/server/metrics/LatchableEmitter.java     |  19 +-
 .../java/org/apache/druid/cli/GuiceRunnable.java   |  11 +-
 .../main/java/org/apache/druid/cli/Version.java    |   6 +-
 .../testing/embedded/DruidServerResource.java      |  14 +-
 .../testing/embedded/EmbeddedDruidCluster.java     |   6 +
 .../testing/embedded/EmbeddedDruidServer.java      |  35 +-
 .../druid/testing/embedded/EmbeddedOverlord.java   |  13 +-
 .../druid/testing/embedded/EmbeddedZookeeper.java  |   6 +
 .../druid/testing/embedded/RuntimeInfoModule.java  |  61 ++++
 .../apache/druid/testing/embedded/TestFolder.java  |   8 +
 .../embedded/derby/InMemoryDerbyResource.java      |   6 +
 .../org.apache.druid.initialization.DruidModule    |   1 +
 58 files changed, 959 insertions(+), 221 deletions(-)

diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 15025a557bb..49684909f4a 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -55,11 +55,21 @@
       <artifactId>druid-kafka-indexing-service</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-multi-stage-query</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-server</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-sql</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid.extensions</groupId>
       <artifactId>mysql-metadata-storage</artifactId>
@@ -102,6 +112,12 @@
       <type>test-jar</type>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-sql</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-services</artifactId>
@@ -170,6 +186,27 @@
       <artifactId>jackson-dataformat-yaml</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${apache.kafka.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.testcontainers</groupId>
       <artifactId>testcontainers</artifactId>
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
new file mode 100644
index 00000000000..a53b42bb3ee
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Convenience APIs for issuing MSQ SQL queries.
+ */
+public class EmbeddedMSQApis
+{
+  private final EmbeddedDruidCluster cluster;
+  private final EmbeddedOverlord overlord;
+
+  public EmbeddedMSQApis(EmbeddedDruidCluster cluster, EmbeddedOverlord 
overlord)
+  {
+    this.cluster = cluster;
+    this.overlord = overlord;
+  }
+
+  /**
+   * Submits the given SQL query to any of the brokers (using {@code 
BrokerClient})
+   * of the cluster.
+   *
+   * @return The result of the SQL as a single CSV string.
+   *
+   * @see EmbeddedClusterApis#runSql(String, Object...) similar command for 
the default interactive SQL engine
+   */
+  public String runDartSql(String sql, Object... args)
+  {
+    return FutureUtils.getUnchecked(
+        cluster.anyBroker().submitSqlQuery(
+            new ClientSqlQuery(
+                StringUtils.format(sql, args),
+                ResultFormat.CSV.name(),
+                false,
+                false,
+                false,
+                Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME),
+                null
+            )
+        ),
+        true
+    ).trim();
+  }
+
+  /**
+   * Submits the given SQL query to any of the brokers (using {@code 
BrokerClient})
+   * of the cluster. Waits for it to complete, then returns the query report.
+   *
+   * @return The result of the SQL as a single CSV string.
+   */
+  public MSQTaskReportPayload runTaskSql(String sql, Object... args)
+  {
+    final SqlTaskStatus taskStatus =
+        FutureUtils.getUnchecked(
+            cluster.anyBroker().submitSqlTask(
+                new ClientSqlQuery(
+                    StringUtils.format(sql, args),
+                    ResultFormat.CSV.name(),
+                    false,
+                    false,
+                    false,
+                    null,
+                    null
+                )
+            ),
+            true
+        );
+
+    if (taskStatus.getState() != TaskState.RUNNING) {
+      throw DruidException.defensive(
+          "Task[%s] had unexpected state[%s]",
+          taskStatus.getTaskId(),
+          taskStatus.getState()
+      );
+    }
+
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord);
+
+    final TaskReport.ReportMap taskReport = FutureUtils.getUnchecked(
+        cluster.leaderOverlord().taskReportAsMap(taskStatus.getTaskId()),
+        true
+    );
+
+    final Optional<MSQTaskReport> report = 
taskReport.findReport(MSQTaskReport.REPORT_KEY);
+    final MSQTaskReportPayload taskReportPayload = 
report.map(MSQTaskReport::getPayload).orElse(null);
+
+    if (taskReportPayload == null) {
+      throw DruidException.defensive(
+          "No report of type[%s] found for task[%s]",
+          MSQTaskReport.REPORT_KEY,
+          taskStatus.getTaskId()
+      );
+    }
+
+    return taskReportPayload;
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
new file mode 100644
index 00000000000..3962f81202d
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.emitter.kafka.KafkaEmitter;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartControllerModule;
+import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
+import org.apache.druid.msq.guice.MSQDurableStorageModule;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.guice.MSQSqlModule;
+import org.apache.druid.msq.guice.SqlTaskModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.Period;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Embedded test to emit cluster metrics using a {@link KafkaEmitter} and then
+ * ingest them back into the cluster with a {@code KafkaSupervisor}.
+ */
+public class EmbeddedMSQRealtimeQueryTest extends EmbeddedClusterTestBase
+{
+  private static final Period TASK_DURATION = Period.hours(1);
+  private static final int TASK_COUNT = 2;
+
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedRouter router = new EmbeddedRouter();
+  private final int totalRows = 
TestIndex.getMMappedWikipediaIndex().getNumRows();
+
+  private KafkaResource kafka;
+  private String topic;
+  private EmbeddedMSQApis msqApis;
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    final EmbeddedDruidCluster cluster = 
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+
+    kafka = new KafkaResource();
+
+    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"always");
+
+    overlord.addProperty("druid.manager.segments.useIncrementalCache", 
"always")
+            .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+    broker.addProperty("druid.msq.dart.controller.heapFraction", "0.9")
+          .addProperty("druid.query.default.context.maxConcurrentStages", "1");
+
+    historical.addProperty("druid.msq.dart.worker.heapFraction", "0.9")
+              .addProperty("druid.msq.dart.worker.concurrentQueries", "1");
+
+    indexer.setServerMemory(300_000_000) // to run 2x realtime and 2x MSQ tasks
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           // druid.processing.numThreads must be higher than # of MSQ tasks 
to avoid contention, because the realtime
+           // server is contacted in such a way that the processing thread is 
blocked
+           .addProperty("druid.processing.numThreads", "3")
+           .addProperty("druid.worker.capacity", "4");
+
+    cluster.addExtension(KafkaIndexTaskModule.class)
+           .addExtension(DartControllerModule.class)
+           .addExtension(DartWorkerModule.class)
+           .addExtension(DartControllerMemoryManagementModule.class)
+           .addExtension(DartControllerModule.class)
+           .addExtension(DartWorkerMemoryManagementModule.class)
+           .addExtension(DartWorkerModule.class)
+           .addExtension(IndexerMemoryManagementModule.class)
+           .addExtension(MSQDurableStorageModule.class)
+           .addExtension(MSQIndexingModule.class)
+           .addExtension(MSQSqlModule.class)
+           .addExtension(SqlTaskModule.class)
+           .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
+           .addCommonProperty("druid.msq.dart.enabled", "true")
+           .useLatchableEmitter()
+           .addResource(kafka)
+           .addServer(coordinator)
+           .addServer(overlord)
+           .addServer(indexer)
+           .addServer(broker)
+           .addServer(historical)
+           .addServer(router);
+
+    return cluster;
+  }
+
+  @BeforeEach
+  void setUpEach()
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+    topic = dataSource = EmbeddedClusterApis.createTestDatasourceName();
+
+    // Create Kafka topic.
+    kafka.createTopicWithPartitions(topic, 2);
+
+    // Submit a supervisor.
+    final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor();
+    final Map<String, String> startSupervisorResult =
+        cluster.callApi().onLeaderOverlord(o -> 
o.postSupervisor(kafkaSupervisorSpec));
+    Assertions.assertEquals(Map.of("id", dataSource), startSupervisorResult);
+
+    // Send data to Kafka.
+    final QueryableIndexCursorFactory wikiCursorFactory =
+        new QueryableIndexCursorFactory(TestIndex.getMMappedWikipediaIndex());
+    final RowSignature wikiSignature = wikiCursorFactory.getRowSignature();
+    kafka.produceRecordsToTopic(
+        FrameTestUtil.readRowsFromCursorFactory(wikiCursorFactory)
+                     .map(row -> {
+                       final Map<String, Object> rowMap = new 
LinkedHashMap<>();
+                       for (int i = 0; i < row.size(); i++) {
+                         rowMap.put(wikiSignature.getColumnName(i), 
row.get(i));
+                       }
+                       try {
+                         return new ProducerRecord<>(
+                             topic,
+                             ByteArrays.EMPTY_ARRAY,
+                             TestHelper.JSON_MAPPER.writeValueAsBytes(rowMap)
+                         );
+                       }
+                       catch (JsonProcessingException e) {
+                         throw new RuntimeException(e);
+                       }
+                     })
+                     .toList()
+    );
+
+    // Wait for it to be loaded.
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/events/processed")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
Collections.singletonList(dataSource)),
+        agg -> agg.hasSumAtLeast(totalRows)
+    );
+  }
+
+  @AfterEach
+  void tearDownEach() throws ExecutionException, InterruptedException, 
IOException
+  {
+    final Map<String, String> terminateSupervisorResult =
+        cluster.callApi().onLeaderOverlord(o -> 
o.terminateSupervisor(dataSource));
+    Assertions.assertEquals(Map.of("id", dataSource), 
terminateSupervisorResult);
+
+    // Cancel all running tasks, so we don't need to wait for them to hand off 
their segments.
+    try (final CloseableIterator<TaskStatusPlus> it = 
cluster.leaderOverlord().taskStatuses(null, null, null).get()) {
+      while (it.hasNext()) {
+        cluster.leaderOverlord().cancelTask(it.next().getId());
+      }
+    }
+
+    kafka.deleteTopic(topic);
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_selectCount_task_default()
+  {
+    final String sql = StringUtils.format("SELECT COUNT(*) FROM \"%s\"", 
dataSource);
+    final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+    // By default tasks do not include realtime data; count is zero.
+    BaseCalciteQueryTest.assertResultsEquals(
+        sql,
+        Collections.singletonList(new Object[]{0}),
+        payload.getResults().getResults()
+    );
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_selectCount_task_withRealtime()
+  {
+    final String sql = StringUtils.format(
+        "SET includeSegmentSource = 'REALTIME';\n"
+        + "SELECT COUNT(*) FROM \"%s\"",
+        dataSource
+    );
+
+    final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+    BaseCalciteQueryTest.assertResultsEquals(
+        sql,
+        Collections.singletonList(new Object[]{totalRows}),
+        payload.getResults().getResults()
+    );
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_selectCount_dart_default()
+  {
+    final String sql = StringUtils.format("SELECT COUNT(*) FROM \"%s\"", 
dataSource);
+    final long selectedCount = Long.parseLong(msqApis.runDartSql(sql));
+
+    // By default Dart includes realtime data.
+    Assertions.assertEquals(totalRows, selectedCount);
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_selectCount_dart_noRealtime()
+  {
+    final String sql = StringUtils.format(
+        "SET includeSegmentSource = 'NONE';\n"
+        + "SELECT COUNT(*) FROM \"%s\"",
+        dataSource
+    );
+
+    final long selectedCount = Long.parseLong(msqApis.runDartSql(sql));
+    Assertions.assertEquals(0, selectedCount);
+  }
+
+  @Test
+  @Timeout(60)
+  @Disabled // Test does not currently pass, see 
https://github.com/apache/druid/issues/18198
+  public void test_selectJoin_dart()
+  {
+    final long selectedCount = Long.parseLong(
+        msqApis.runDartSql(
+            "SELECT COUNT(*) FROM \"%s\"\n"
+            + "WHERE countryName IN (\n"
+            + "  SELECT countryName\n"
+            + "  FROM \"%s\"\n"
+            + "  WHERE countryName IS NOT NULL\n"
+            + "  GROUP BY 1\n"
+            + "  ORDER BY COUNT(*) DESC\n"
+            + "  LIMIT 1\n"
+            + ")",
+            dataSource,
+            dataSource
+        )
+    );
+
+    Assertions.assertEquals(528, selectedCount);
+  }
+
+  @Test
+  @Timeout(60)
+  @Disabled // Test does not currently pass, see 
https://github.com/apache/druid/issues/18198
+  public void test_selectJoin_task_withRealtime()
+  {
+    final String sql = StringUtils.format(
+        "SET includeSegmentSource = 'REALTIME';\n"
+        + "SELECT COUNT(*) FROM \"%s\"\n"
+        + "WHERE countryName IN (\n"
+        + "  SELECT countryName\n"
+        + "  FROM \"%s\"\n"
+        + "  WHERE countryName IS NOT NULL\n"
+        + "  GROUP BY 1\n"
+        + "  ORDER BY COUNT(*) DESC\n"
+        + "  LIMIT 1\n"
+        + ")",
+        dataSource,
+        dataSource
+    );
+
+    final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+    BaseCalciteQueryTest.assertResultsEquals(
+        sql,
+        Collections.singletonList(new Object[]{528}),
+        payload.getResults().getResults()
+    );
+  }
+
+  private KafkaSupervisorSpec createKafkaSupervisor()
+  {
+    final Period startDelay = Period.millis(10);
+    final Period supervisorRunPeriod = Period.millis(500);
+    final boolean useEarliestOffset = true;
+
+    return new KafkaSupervisorSpec(
+        dataSource,
+        null,
+        DataSchema.builder()
+                  .withDataSource(dataSource)
+                  .withTimestamp(new TimestampSpec("__time", "auto", null))
+                  .withGranularity(new 
UniformGranularitySpec(Granularities.DAY, null, null))
+                  
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+                  .build(),
+        null,
+        new KafkaSupervisorIOConfig(
+            topic,
+            null,
+            new JsonInputFormat(null, null, null, null, null),
+            null,
+            TASK_COUNT,
+            TASK_DURATION,
+            kafka.consumerProperties(),
+            null,
+            null,
+            null,
+            startDelay,
+            supervisorRunPeriod,
+            useEarliestOffset,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
index d8a650f78fe..c109516e487 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
@@ -133,6 +133,12 @@ public class KafkaResource extends 
TestcontainerResource<KafkaContainer>
     return Admin.create(commonClientProperties());
   }
 
+  @Override
+  public String toString()
+  {
+    return "KafkaResource";
+  }
+
   private KafkaProducer<byte[], byte[]> newProducer()
   {
     return new KafkaProducer<>(producerProperties());
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
index ccf8504875d..810a4082c37 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.lookup.namespace;
 
 import com.google.common.base.Strings;
+import com.google.inject.Inject;
 import org.apache.druid.data.input.MapPopulator;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -30,7 +31,7 @@ import org.apache.druid.query.lookup.namespace.CacheGenerator;
 import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
 import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.ResultIterator;
@@ -51,9 +52,15 @@ public final class JdbcCacheGenerator implements 
CacheGenerator<JdbcExtractionNa
   private static final String NO_SUITABLE_DRIVER_FOUND_ERROR = "No suitable 
driver found";
   private static final String JDBC_DRIVER_JAR_FILES_MISSING_ERROR =
       "JDBC driver JAR files missing from 
extensions/druid-lookups-cached-global directory";
-  private static final long MAX_MEMORY = 
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes();
   private final 
ConcurrentMap<CacheScheduler.EntryImpl<JdbcExtractionNamespace>, DBI> dbiCache =
       new ConcurrentHashMap<>();
+  private final long maxMemory;
+
+  @Inject
+  public JdbcCacheGenerator(RuntimeInfo runtimeInfo)
+  {
+    this.maxMemory = runtimeInfo.getMaxHeapSizeBytes();
+  }
 
   @Override
   @Nullable
@@ -100,7 +107,7 @@ public final class JdbcCacheGenerator implements 
CacheGenerator<JdbcExtractionNa
       final MapPopulator.PopulateResult populateResult = 
MapPopulator.populateAndWarnAtByteLimit(
           pairs,
           cache.getCache(),
-          (long) (MAX_MEMORY * namespace.getMaxHeapPercentage() / 100.0),
+          (long) (maxMemory * namespace.getMaxHeapPercentage() / 100.0),
           null == entryId ? null : entryId.toString()
       );
       final long duration = System.nanoTime() - startNs;
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
index c42bc91d5bd..f30bda3a553 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
@@ -33,7 +33,7 @@ import org.apache.druid.segment.loading.URIDataPuller;
 import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import org.apache.druid.utils.CompressionUtils;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import javax.annotation.Nullable;
 import java.io.FileNotFoundException;
@@ -50,15 +50,17 @@ public final class UriCacheGenerator implements 
CacheGenerator<UriExtractionName
 {
   private static final int DEFAULT_NUM_RETRIES = 3;
   private static final Logger log = new Logger(UriCacheGenerator.class);
-  private static final long MAX_MEMORY = 
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes();
   private final Map<String, SearchableVersionedDataFinder> pullers;
+  private final long maxMemory;
 
   @Inject
   public UriCacheGenerator(
-      Map<String, SearchableVersionedDataFinder> pullers
+      Map<String, SearchableVersionedDataFinder> pullers,
+      RuntimeInfo runtimeInfo
   )
   {
     this.pullers = pullers;
+    this.maxMemory = runtimeInfo.getMaxHeapSizeBytes();
   }
 
   @Override
@@ -151,7 +153,7 @@ public final class UriCacheGenerator implements 
CacheGenerator<UriExtractionName
             ).populateAndWarnAtByteLimit(
                 source,
                 cache.getCache(),
-                (long) (MAX_MEMORY * 
extractionNamespace.getMaxHeapPercentage() / 100.0),
+                (long) (maxMemory * extractionNamespace.getMaxHeapPercentage() 
/ 100.0),
                 null == entryId ? null : entryId.toString()
             );
             final long duration = System.nanoTime() - startNs;
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
index 7162eac0a2d..8ba1f638f57 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
 import 
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.utils.JvmUtils;
 import org.easymock.EasyMock;
 import org.joda.time.Period;
 import org.junit.Before;
@@ -76,7 +77,7 @@ public class JdbcCacheGeneratorTest
   @Before
   public void setup()
   {
-    target = new JdbcCacheGenerator();
+    target = new JdbcCacheGenerator(JvmUtils.getRuntimeInfo());
   }
 
   @Test
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
index 4e04f034cab..852a05c6df8 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
 import 
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
@@ -71,9 +72,10 @@ public class NamespacedExtractorModuleTest
                 ImmutableMap.of(
                     "file",
                     new LocalFileTimestampVersionFinder()
-                )
+                ),
+                JvmUtils.getRuntimeInfo()
             ),
-            JdbcExtractionNamespace.class, new JdbcCacheGenerator()
+            JdbcExtractionNamespace.class, new 
JdbcCacheGenerator(JvmUtils.getRuntimeInfo())
         );
     lifecycle = new Lifecycle();
     lifecycle.start();
@@ -104,7 +106,8 @@ public class NamespacedExtractorModuleTest
       out.write(MAPPER.writeValueAsString(ImmutableMap.of("foo", "bar")));
     }
     final UriCacheGenerator factory = new UriCacheGenerator(
-        ImmutableMap.of("file", new LocalFileTimestampVersionFinder())
+        ImmutableMap.of("file", new LocalFileTimestampVersionFinder()),
+        JvmUtils.getRuntimeInfo()
     );
     final UriExtractionNamespace namespace = new UriExtractionNamespace(
         tmpFile.toURI(),
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
index 3caad9cec34..d792ac236de 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
@@ -38,6 +38,7 @@ import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheMa
 import 
org.apache.druid.server.lookup.namespace.cache.OffHeapNamespaceExtractionCacheManager;
 import 
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
@@ -241,7 +242,7 @@ public class UriCacheGeneratorTest
     this.cacheManager = cacheManagerCreator.apply(lifecycle);
     this.scheduler = new CacheScheduler(
         new NoopServiceEmitter(),
-        ImmutableMap.of(UriExtractionNamespace.class, new 
UriCacheGenerator(FINDERS)),
+        ImmutableMap.of(UriExtractionNamespace.class, new 
UriCacheGenerator(FINDERS, JvmUtils.getRuntimeInfo())),
         cacheManager
     );
   }
@@ -268,7 +269,7 @@ public class UriCacheGeneratorTest
           ""
       )));
     }
-    generator = new UriCacheGenerator(FINDERS);
+    generator = new UriCacheGenerator(FINDERS, JvmUtils.getRuntimeInfo());
     namespace = new UriExtractionNamespace(
         tmpFile.toURI(),
         null, null,
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
index b6deed7be5e..033b4060422 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
@@ -38,6 +38,7 @@ import 
org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
 import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
@@ -152,7 +153,7 @@ public class CacheSchedulerTest
             UriExtractionNamespace.class,
             cacheGenerator,
             JdbcExtractionNamespace.class,
-            new JdbcCacheGenerator()
+            new JdbcCacheGenerator(JvmUtils.getRuntimeInfo())
         ),
         cacheManager
     );
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
index 6c164564f40..92142dcd1d0 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
@@ -40,6 +40,7 @@ import 
org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
 import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
@@ -181,7 +182,7 @@ public class JdbcExtractionNamespaceTest
                   new CacheGenerator<JdbcExtractionNamespace>()
                   {
                     private final JdbcCacheGenerator delegate =
-                        new JdbcCacheGenerator();
+                        new JdbcCacheGenerator(JvmUtils.getRuntimeInfo());
 
                     @Override
                     public String generateCache(
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
index 25094f44a79..f97f895314c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
@@ -20,18 +20,28 @@
 package org.apache.druid.msq.dart.guice;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.exec.MemoryIntrospector;
 
 /**
  * Runtime configuration for controllers (which run on Brokers).
  */
 public class DartControllerConfig
 {
+  /**
+   * Allocate up to 15% of memory for the MSQ framework. This accounts for 
additional overhead due to native queries,
+   * the segment timeline, and lookups (which aren't accounted for by our 
{@link MemoryIntrospector}).
+   */
+  private static final double DEFAULT_HEAP_FRACTION = 0.15;
+
   @JsonProperty("concurrentQueries")
   private int concurrentQueries = 1;
 
   @JsonProperty("maxQueryReportSize")
   private int maxQueryReportSize = 100_000_000;
 
+  @JsonProperty("heapFraction")
+  private double heapFraction = DEFAULT_HEAP_FRACTION;
+
   public int getConcurrentQueries()
   {
     return concurrentQueries;
@@ -41,4 +51,9 @@ public class DartControllerConfig
   {
     return maxQueryReportSize;
   }
+
+  public double getHeapFraction()
+  {
+    return heapFraction;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerMemoryManagementModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerMemoryManagementModule.java
index 95f110ec88b..77ca55baae0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerMemoryManagementModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerMemoryManagementModule.java
@@ -27,7 +27,7 @@ import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.MemoryIntrospectorImpl;
 import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 /**
  * Memory management module for Brokers.
@@ -35,12 +35,6 @@ import org.apache.druid.utils.JvmUtils;
 @LoadScope(roles = {NodeRole.BROKER_JSON_NAME})
 public class DartControllerMemoryManagementModule implements DruidModule
 {
-  /**
-   * Allocate up to 15% of memory for the MSQ framework. This accounts for 
additional overhead due to native queries,
-   * the segment timeline, and lookups (which aren't accounted for by our 
{@link MemoryIntrospector}).
-   */
-  public static final double USABLE_MEMORY_FRACTION = 0.15;
-
   @Override
   public void configure(Binder binder)
   {
@@ -49,13 +43,14 @@ public class DartControllerMemoryManagementModule 
implements DruidModule
 
   @Provides
   public MemoryIntrospector createMemoryIntrospector(
+      final RuntimeInfo runtimeInfo,
       final DruidProcessingConfig processingConfig,
       final DartControllerConfig controllerConfig
   )
   {
     return new MemoryIntrospectorImpl(
-        JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
-        USABLE_MEMORY_FRACTION,
+        runtimeInfo.getMaxHeapSizeBytes(),
+        controllerConfig.getHeapFraction(),
         controllerConfig.getConcurrentQueries(),
         processingConfig.getNumThreads(),
         null
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerMemoryManagementModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerMemoryManagementModule.java
index 9f51a65152a..2dff7989c04 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerMemoryManagementModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerMemoryManagementModule.java
@@ -34,7 +34,7 @@ import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.MemoryIntrospectorImpl;
 import org.apache.druid.msq.exec.ProcessingBuffersProvider;
 import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.nio.ByteBuffer;
 
@@ -53,11 +53,12 @@ public class DartWorkerMemoryManagementModule implements 
DruidModule
   @Provides
   public MemoryIntrospector createMemoryIntrospector(
       final DartWorkerConfig workerConfig,
-      final DruidProcessingConfig druidProcessingConfig
+      final DruidProcessingConfig druidProcessingConfig,
+      final RuntimeInfo runtimeInfo
   )
   {
     return new MemoryIntrospectorImpl(
-        JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
+        runtimeInfo.getMaxHeapSizeBytes(),
         workerConfig.getHeapFraction(),
         computeConcurrentQueries(workerConfig, druidProcessingConfig),
         druidProcessingConfig.getNumThreads(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQMetriceEventBuilder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQMetriceEventBuilder.java
index 8058b2564e2..ad1284197df 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQMetriceEventBuilder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQMetriceEventBuilder.java
@@ -34,6 +34,11 @@ import java.util.Map;
  */
 public class MSQMetriceEventBuilder extends ServiceMetricEvent.Builder
 {
+  /**
+   * Value to emit for {@link DruidMetrics#TYPE} in query metrics.
+   */
+  private static final String QUERY_METRIC_TYPE = "msq";
+
   /**
    * Sets dimensions for Dart queries, as well as some common dimensions. Sets 
the following dimensions:
    * <ul>
@@ -84,8 +89,8 @@ public class MSQMetriceEventBuilder extends 
ServiceMetricEvent.Builder
 
   private void setQueryIdDimensions(final QueryContext queryContext)
   {
-    setDimension(BaseQuery.QUERY_ID, queryContext.get(BaseQuery.QUERY_ID));
-    setDimension(BaseQuery.SQL_QUERY_ID, 
queryContext.get(BaseQuery.SQL_QUERY_ID));
-    setDimension(DruidMetrics.TYPE, "msq");
+    setDimensionIfNotNull(BaseQuery.QUERY_ID, 
queryContext.getString(BaseQuery.QUERY_ID));
+    setDimensionIfNotNull(BaseQuery.SQL_QUERY_ID, 
queryContext.getString(BaseQuery.SQL_QUERY_ID));
+    setDimension(DruidMetrics.TYPE, QUERY_METRIC_TYPE);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
index 61f03e40ab6..4310d55fbd4 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
@@ -33,7 +33,7 @@ import org.apache.druid.msq.exec.ProcessingBuffersProvider;
 import org.apache.druid.msq.indexing.IndexerProcessingBuffersProvider;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 /**
  * Provides {@link MemoryIntrospector} for multi-task-per-JVM model.
@@ -63,6 +63,7 @@ public class IndexerMemoryManagementModule implements 
DruidModule
   @Provides
   @ManageLifecycle
   public MemoryIntrospector createMemoryIntrospector(
+      final RuntimeInfo runtimeInfo,
       final LookupExtractorFactoryContainerProvider lookupProvider,
       final TaskMemoryManagementConfig taskMemoryManagementConfig,
       final DruidProcessingConfig processingConfig,
@@ -70,7 +71,7 @@ public class IndexerMemoryManagementModule implements 
DruidModule
   )
   {
     return new MemoryIntrospectorImpl(
-        JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
+        runtimeInfo.getMaxHeapSizeBytes(),
         MSQ_MEMORY_FRACTION,
         workerConfig.getCapacity(),
         PeonMemoryManagementModule.getNumThreads(taskMemoryManagementConfig, 
processingConfig),
@@ -82,11 +83,12 @@ public class IndexerMemoryManagementModule implements 
DruidModule
   @LazySingleton
   public ProcessingBuffersProvider createProcessingBuffersProvider(
       final MemoryIntrospector memoryIntrospector,
-      final WorkerConfig workerConfig
+      final WorkerConfig workerConfig,
+      final RuntimeInfo runtimeInfo
   )
   {
     return new IndexerProcessingBuffersProvider(
-        (long) (JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() * 
PROCESSING_MEMORY_FRACTION),
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * 
PROCESSING_MEMORY_FRACTION),
         workerConfig.getCapacity(),
         memoryIntrospector.numProcessingThreads()
     );
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
index 39265434584..a404e229f82 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
@@ -34,7 +34,7 @@ import org.apache.druid.msq.exec.ProcessingBuffersProvider;
 import org.apache.druid.msq.indexing.PeonProcessingBuffersProvider;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.nio.ByteBuffer;
 
@@ -68,11 +68,12 @@ public class PeonMemoryManagementModule implements 
DruidModule
   public MemoryIntrospector createMemoryIntrospector(
       final LookupExtractorFactoryContainerProvider lookupProvider,
       final DruidProcessingConfig processingConfig,
-      final TaskMemoryManagementConfig taskMemoryManagementConfig
+      final TaskMemoryManagementConfig taskMemoryManagementConfig,
+      final RuntimeInfo runtimeInfo
   )
   {
     return new MemoryIntrospectorImpl(
-        JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
+        runtimeInfo.getMaxHeapSizeBytes(),
         USABLE_MEMORY_FRACTION,
         NUM_WORKERS_IN_JVM,
         getNumThreads(taskMemoryManagementConfig, processingConfig),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
 
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
index 8961da7a555..e3bbeb1efaf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
@@ -42,6 +42,7 @@ import org.apache.druid.query.NoopQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.nio.ByteBuffer;
 
@@ -101,10 +102,14 @@ public class PeonProcessingModule implements Module
   @Provides
   @LazySingleton
   @Global
-  public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(Task task, 
DruidProcessingConfig config)
+  public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(
+      Task task,
+      DruidProcessingConfig config,
+      RuntimeInfo runtimeInfo
+  )
   {
     if (task.supportsQueries()) {
-      return DruidProcessingModule.createIntermediateResultsPool(config);
+      return DruidProcessingModule.createIntermediateResultsPool(config, 
runtimeInfo);
     } else {
       return DummyNonBlockingPool.instance();
     }
@@ -113,10 +118,10 @@ public class PeonProcessingModule implements Module
   @Provides
   @LazySingleton
   @Merging
-  public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, 
DruidProcessingConfig config)
+  public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, 
DruidProcessingConfig config, RuntimeInfo runtimeInfo)
   {
     if (task.supportsQueries()) {
-      return DruidProcessingModule.createMergeBufferPool(config);
+      return DruidProcessingModule.createMergeBufferPool(config, runtimeInfo);
     } else {
       if (config.isNumMergeBuffersConfigured()) {
         log.warn(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 8f67794dcb8..4d6e0b0a17d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -69,7 +69,6 @@ import 
org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.JvmUtils;
 import org.apache.druid.utils.RuntimeInfo;
 import org.joda.time.Interval;
 
@@ -141,6 +140,7 @@ public class TaskToolbox
   private final TaskLogPusher taskLogPusher;
   private final String attemptId;
   private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
+  private final RuntimeInfo runtimeInfo;
 
   public TaskToolbox(
       SegmentLoaderConfig segmentLoaderConfig,
@@ -185,7 +185,8 @@ public class TaskToolbox
       ShuffleClient shuffleClient,
       TaskLogPusher taskLogPusher,
       String attemptId,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      RuntimeInfo runtimeInfo
   )
   {
     this.segmentLoaderConfig = segmentLoaderConfig;
@@ -232,6 +233,7 @@ public class TaskToolbox
     this.taskLogPusher = taskLogPusher;
     this.attemptId = attemptId;
     this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
+    this.runtimeInfo = runtimeInfo;
   }
 
   public SegmentLoaderConfig getSegmentLoaderConfig()
@@ -516,7 +518,7 @@ public class TaskToolbox
    */
   public RuntimeInfo getAdjustedRuntimeInfo()
   {
-    return createAdjustedRuntimeInfo(JvmUtils.getRuntimeInfo(), 
appenderatorsManager);
+    return createAdjustedRuntimeInfo(runtimeInfo, appenderatorsManager);
   }
 
   public CentralizedDatasourceSchemaConfig getCentralizedTableSchemaConfig()
@@ -593,6 +595,7 @@ public class TaskToolbox
     private TaskLogPusher taskLogPusher;
     private String attemptId;
     private CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
+    private RuntimeInfo runtimeInfo;
 
     public Builder()
     {
@@ -641,6 +644,7 @@ public class TaskToolbox
       this.supervisorTaskClientProvider = other.supervisorTaskClientProvider;
       this.shuffleClient = other.shuffleClient;
       this.centralizedDatasourceSchemaConfig = 
other.centralizedDatasourceSchemaConfig;
+      this.runtimeInfo = other.runtimeInfo;
     }
 
     public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
@@ -901,6 +905,12 @@ public class TaskToolbox
       return this;
     }
 
+    public Builder runtimeInfo(final RuntimeInfo runtimeInfo)
+    {
+      this.runtimeInfo = runtimeInfo;
+      return this;
+    }
+
     public TaskToolbox build()
     {
       return new TaskToolbox(
@@ -946,7 +956,8 @@ public class TaskToolbox
           shuffleClient,
           taskLogPusher,
           attemptId,
-          centralizedDatasourceSchemaConfig
+          centralizedDatasourceSchemaConfig,
+          runtimeInfo
       );
     }
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 188b6b0962a..3371a84e385 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -68,6 +68,7 @@ import 
org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.io.File;
 import java.util.function.Function;
@@ -121,6 +122,7 @@ public class TaskToolboxFactory
   private final TaskLogPusher taskLogPusher;
   private final String attemptId;
   private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
+  private final RuntimeInfo runtimeInfo;
 
   @Inject
   public TaskToolboxFactory(
@@ -165,7 +167,8 @@ public class TaskToolboxFactory
       ShuffleClient shuffleClient,
       TaskLogPusher taskLogPusher,
       @AttemptId String attemptId,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      RuntimeInfo runtimeInfo
   )
   {
     this.segmentLoaderConfig = segmentLoadConfig;
@@ -210,6 +213,7 @@ public class TaskToolboxFactory
     this.taskLogPusher = taskLogPusher;
     this.attemptId = attemptId;
     this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
+    this.runtimeInfo = runtimeInfo;
   }
 
   public TaskToolbox build(Task task)
@@ -276,6 +280,7 @@ public class TaskToolboxFactory
         .taskLogPusher(taskLogPusher)
         .attemptId(attemptId)
         .centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
+        .runtimeInfo(runtimeInfo)
         .build();
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 9b64ad318e3..2986171325f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -60,6 +60,7 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.apache.druid.utils.RuntimeInfo;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -160,7 +161,8 @@ public class TaskToolboxTest
         null,
         null,
         "1",
-        CentralizedDatasourceSchemaConfig.create()
+        CentralizedDatasourceSchemaConfig.create(),
+        JvmUtils.getRuntimeInfo()
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 535394238a7..80c93cbdf58 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -96,6 +96,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
@@ -314,6 +315,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         .taskLogPusher(null)
         .attemptId("1")
         .centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
+        .runtimeInfo(JvmUtils.getRuntimeInfo())
         .build();
   }
 
@@ -532,6 +534,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
             .taskLogPusher(null)
             .attemptId("1")
             .centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
+            .runtimeInfo(JvmUtils.getRuntimeInfo())
             .build();
 
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 8e59460f93c..0e6fce4517a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -61,6 +61,7 @@ import 
org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.easymock.EasyMock;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
@@ -141,7 +142,8 @@ public class SingleTaskBackgroundRunnerTest
         null,
         null,
         "1",
-        CentralizedDatasourceSchemaConfig.create()
+        CentralizedDatasourceSchemaConfig.create(),
+        JvmUtils.getRuntimeInfo()
     );
     runner = new SingleTaskBackgroundRunner(
         toolboxFactory,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 31fabcf3227..0ff20f447f8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -140,6 +140,7 @@ import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.utils.JvmUtils;
 import org.easymock.EasyMock;
 import org.joda.time.DateTime;
 import org.joda.time.Hours;
@@ -616,7 +617,8 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
         null,
         null,
         "1",
-        CentralizedDatasourceSchemaConfig.create()
+        CentralizedDatasourceSchemaConfig.create(),
+        JvmUtils.getRuntimeInfo()
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
index 8dd8064b4d5..477bb65b828 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
@@ -67,6 +67,7 @@ import 
org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.utils.RuntimeInfo;
 
 public class TestTaskToolboxFactory extends TaskToolboxFactory
 {
@@ -122,7 +123,8 @@ public class TestTaskToolboxFactory extends 
TaskToolboxFactory
         bob.shuffleClient,
         bob.taskLogPusher,
         bob.attemptId,
-        bob.centralizedDatasourceSchemaConfig
+        bob.centralizedDatasourceSchemaConfig,
+        bob.runtimeInfo
     );
   }
 
@@ -169,6 +171,7 @@ public class TestTaskToolboxFactory extends 
TaskToolboxFactory
     private TaskLogPusher taskLogPusher;
     private String attemptId;
     private CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
+    private RuntimeInfo runtimeInfo;
 
     public Builder setConfig(TaskConfig config)
     {
@@ -415,5 +418,11 @@ public class TestTaskToolboxFactory extends 
TaskToolboxFactory
       this.centralizedDatasourceSchemaConfig = 
centralizedDatasourceSchemaConfig;
       return this;
     }
+
+    public Builder setRuntimeInfo(RuntimeInfo runtimeInfo)
+    {
+      this.runtimeInfo = runtimeInfo;
+      return this;
+    }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
index 2008ee09360..2cc2a1b6c25 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.incremental.AppendableIndexBuilder;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.utils.JvmUtils;
 import org.apache.druid.utils.RuntimeInfo;
 import org.joda.time.Period;
 import org.junit.Assert;
@@ -195,7 +196,8 @@ public class SeekableStreamAppenderatorConfigTest
           null,
           null,
           new 
DruidProcessingBufferConfig(HumanReadableBytes.valueOf(bufferSize), null, null),
-          null
+          null,
+          JvmUtils.getRuntimeInfo()
       );
     }
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 3876c32ce18..8090f45dd0c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -129,6 +129,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CompressionUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.assertj.core.api.Assertions;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -713,7 +714,8 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
         null,
         null,
         "1",
-        CentralizedDatasourceSchemaConfig.create()
+        CentralizedDatasourceSchemaConfig.create(),
+        JvmUtils.getRuntimeInfo()
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 445e9469c9d..0a47f687f3c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -57,6 +57,7 @@ import 
org.apache.druid.segment.realtime.NoopChatHandlerProvider;
 import org.apache.druid.server.coordination.ChangeRequestHistory;
 import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -171,7 +172,8 @@ public class WorkerTaskManagerTest
                 null,
                 null,
                 "1",
-                CentralizedDatasourceSchemaConfig.create()
+                CentralizedDatasourceSchemaConfig.create(),
+                JvmUtils.getRuntimeInfo()
             ),
             taskConfig,
             location
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 79d96fde860..da7184b3995 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -65,6 +65,7 @@ import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.easymock.EasyMock;
 import org.joda.time.Period;
 import org.junit.After;
@@ -216,7 +217,8 @@ public class WorkerTaskMonitorTest
                 null,
                 null,
                 "1",
-                CentralizedDatasourceSchemaConfig.create()
+                CentralizedDatasourceSchemaConfig.create(),
+                JvmUtils.getRuntimeInfo()
             ),
             taskConfig,
             new NoopServiceEmitter(),
diff --git 
a/processing/src/main/java/org/apache/druid/guice/RuntimeInfoModule.java 
b/processing/src/main/java/org/apache/druid/guice/RuntimeInfoModule.java
deleted file mode 100644
index c09c1330ccb..00000000000
--- a/processing/src/main/java/org/apache/druid/guice/RuntimeInfoModule.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.google.inject.Binder;
-import com.google.inject.Module;
-import org.apache.druid.utils.RuntimeInfo;
-
-public class RuntimeInfoModule implements Module
-{
-  @Override
-  public void configure(Binder binder)
-  {
-    binder.requestStaticInjection(RuntimeInfo.class);
-  }
-}
diff --git 
a/processing/src/main/java/org/apache/druid/guice/StartupInjectorBuilder.java 
b/processing/src/main/java/org/apache/druid/guice/StartupInjectorBuilder.java
index d71ba6d3753..b98f9e251eb 100644
--- 
a/processing/src/main/java/org/apache/druid/guice/StartupInjectorBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/guice/StartupInjectorBuilder.java
@@ -94,10 +94,7 @@ public class StartupInjectorBuilder extends 
BaseInjectorBuilder<StartupInjectorB
   public StartupInjectorBuilder forServer()
   {
     withExtensions();
-    add(
-        new PropertiesModule(Arrays.asList("common.runtime.properties", 
"runtime.properties")),
-        new RuntimeInfoModule()
-    );
+    add(new PropertiesModule(Arrays.asList("common.runtime.properties", 
"runtime.properties")));
     return this;
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java 
b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index de5859e8820..c469226a62e 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +29,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,13 +64,14 @@ public class DruidProcessingConfig implements ColumnConfig
       @JsonProperty("fifo") @Nullable Boolean fifo,
       @JsonProperty("tmpDir") @Nullable String tmpDir,
       @JsonProperty("buffer") DruidProcessingBufferConfig buffer,
-      @JsonProperty("indexes") DruidProcessingIndexesConfig indexes
+      @JsonProperty("indexes") DruidProcessingIndexesConfig indexes,
+      @JacksonInject RuntimeInfo runtimeInfo
   )
   {
     this.formatString = Configs.valueOrDefault(formatString, "processing-%s");
     this.numThreads = Configs.valueOrDefault(
         numThreads,
-        Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1)
+        Math.max(runtimeInfo.getAvailableProcessors() - 1, 1)
     );
     this.numMergeBuffers = Configs.valueOrDefault(numMergeBuffers, Math.max(2, 
this.numThreads / 4));
     this.fifo = fifo == null || fifo;
@@ -78,16 +81,16 @@ public class DruidProcessingConfig implements ColumnConfig
 
     this.numThreadsConfigured = numThreads != null;
     this.numMergeBuffersConfigured = numMergeBuffers != null;
-    initializeBufferSize();
+    initializeBufferSize(runtimeInfo);
   }
 
   @VisibleForTesting
   public DruidProcessingConfig()
   {
-    this(null, null, null, null, null, null, null);
+    this(null, null, null, null, null, null, null, JvmUtils.getRuntimeInfo());
   }
 
-  private void initializeBufferSize()
+  private void initializeBufferSize(RuntimeInfo runtimeInfo)
   {
     HumanReadableBytes sizeBytesConfigured = this.buffer.getBufferSize();
     if 
(!DruidProcessingBufferConfig.DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured))
 {
@@ -99,7 +102,7 @@ public class DruidProcessingConfig implements ColumnConfig
 
     long directSizeBytes;
     try {
-      directSizeBytes = JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
+      directSizeBytes = runtimeInfo.getDirectMemorySizeBytes();
       log.info(
           "Detected max direct memory size of [%,d] bytes",
           directSizeBytes
diff --git a/processing/src/main/java/org/apache/druid/utils/JvmUtils.java 
b/processing/src/main/java/org/apache/druid/utils/JvmUtils.java
index 696a15aaee2..c5c502ca156 100644
--- a/processing/src/main/java/org/apache/druid/utils/JvmUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/JvmUtils.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.utils;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 import com.google.inject.Inject;
 
@@ -71,6 +70,10 @@ public class JvmUtils
     return MAJOR_VERSION;
   }
 
+  /**
+   * Deprecated, inject {@link RuntimeInfo} instead of using this function.
+   */
+  @Deprecated
   public static RuntimeInfo getRuntimeInfo()
   {
     return RUNTIME_INFO;
@@ -136,13 +139,4 @@ public class JvmUtils
     ).collect(Collectors.toList());
     return jobURLs;
   }
-
-  /**
-   * Only for testing.
-   */
-  @VisibleForTesting
-  public static void resetTestsToDefaultRuntimeInfo()
-  {
-    RUNTIME_INFO = new RuntimeInfo();
-  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java
index e99b1d4f9d6..8e548cbd350 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java
@@ -22,9 +22,6 @@ package org.apache.druid.java.util.emitter.core;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import org.apache.druid.utils.JvmUtils;
-import org.apache.druid.utils.RuntimeInfo;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -41,11 +38,6 @@ public class ParametrizedUriEmitterConfigTest
   {
     return Guice.createInjector(
         binder -> {
-          JvmUtils.resetTestsToDefaultRuntimeInfo();
-          binder.bind(RuntimeInfo.class)
-                .toInstance(JvmUtils.getRuntimeInfo());
-          binder.requestStaticInjection(JvmUtils.class);
-
           final ParametrizedUriEmitterConfig paramConfig = new 
ObjectMapper().convertValue(
               Emitters.makeCustomFactoryMap(props), 
ParametrizedUriEmitterConfig.class);
           final HttpEmitterConfig httpEmitterConfig = 
paramConfig.buildHttpEmitterConfig("http://example.com/topic";);
@@ -54,12 +46,6 @@ public class ParametrizedUriEmitterConfigTest
     );
   }
 
-  @AfterClass
-  public static void teardown()
-  {
-    JvmUtils.resetTestsToDefaultRuntimeInfo();
-  }
-
   @Test
   public void testDefaults()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
 
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
index 2be92dcd31e..9b4feb29f46 100644
--- 
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
@@ -19,14 +19,14 @@
 
 package org.apache.druid.query;
 
+import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.ProvisionException;
 import org.apache.druid.error.ExceptionMatcher;
+import org.apache.druid.guice.DruidSecondaryModule;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.StartupInjectorBuilder;
-import org.apache.druid.utils.JvmUtils;
 import org.apache.druid.utils.RuntimeInfo;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,6 +35,7 @@ import org.junit.rules.ExpectedException;
 import java.util.Properties;
 
 /**
+ *
  */
 public class DruidProcessingConfigTest
 {
@@ -43,12 +44,6 @@ public class DruidProcessingConfigTest
   private static final long DIRECT_SIZE = BUFFER_SIZE * (3L + 2L + 1L);
   private static final long HEAP_SIZE = BUFFER_SIZE * 2L;
 
-  @AfterClass
-  public static void teardown()
-  {
-    JvmUtils.resetTestsToDefaultRuntimeInfo();
-  }
-
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
@@ -166,6 +161,7 @@ public class DruidProcessingConfigTest
   {
     return makeInjector(numProcessors, directMemorySize, heapSize, new 
Properties());
   }
+
   private static Injector makeInjector(
       int numProcessors,
       long directMemorySize,
@@ -173,14 +169,14 @@ public class DruidProcessingConfigTest
       Properties props
   )
   {
-    Injector injector = new StartupInjectorBuilder().withProperties(props).add(
+    final Injector startupInjector = new 
StartupInjectorBuilder().withProperties(props).build();
+    return Guice.createInjector(
+        startupInjector.getInstance(DruidSecondaryModule.class),
         binder -> {
           binder.bind(RuntimeInfo.class).toInstance(new 
MockRuntimeInfo(numProcessors, directMemorySize, heapSize));
-          binder.requestStaticInjection(JvmUtils.class);
           JsonConfigProvider.bind(binder, "druid.processing", 
DruidProcessingConfig.class);
         }
-    ).build();
-    return injector;
+    );
   }
 
   public static class MockRuntimeInfo extends RuntimeInfo
@@ -202,6 +198,12 @@ public class DruidProcessingConfigTest
       return availableProcessors;
     }
 
+    @Override
+    public long getTotalHeapSizeBytes()
+    {
+      return maxHeapSize;
+    }
+
     @Override
     public long getMaxHeapSizeBytes()
     {
diff --git 
a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java 
b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
index 7faa2ecd965..30bfa22937d 100644
--- a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
+++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
@@ -39,12 +39,12 @@ import java.util.List;
 public interface BrokerClient
 {
   /**
-   * Submit the given {@code sqlQuery} to the Broker's SQL query endpoint.
+   * Submit the given {@code sqlQuery} to the Broker's SQL query endpoint, 
{@code /druid/v2/sql/}.
    */
   ListenableFuture<String> submitSqlQuery(ClientSqlQuery sqlQuery);
 
   /**
-   * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint.
+   * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint, 
{@code /druid/v2/sql/task/}.
    */
   ListenableFuture<SqlTaskStatus> submitSqlTask(ClientSqlQuery sqlQuery);
 
diff --git 
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index bc12e929219..e737032015e 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -44,7 +44,7 @@ import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ForkJoinPool;
@@ -90,9 +90,9 @@ public class BrokerProcessingModule implements Module
   @Provides
   @LazySingleton
   @Global
-  public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config)
+  public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config, RuntimeInfo 
runtimeInfo)
   {
-    verifyDirectMemory(config);
+    verifyDirectMemory(config, runtimeInfo);
     return new StupidPool<>(
         "intermediate processing pool",
         new OffheapBufferGenerator("intermediate processing", 
config.intermediateComputeSizeBytes()),
@@ -104,9 +104,9 @@ public class BrokerProcessingModule implements Module
   @Provides
   @LazySingleton
   @Merging
-  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig 
config)
+  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig 
config, RuntimeInfo runtimeInfo)
   {
-    verifyDirectMemory(config);
+    verifyDirectMemory(config, runtimeInfo);
     return new DefaultBlockingPool<>(
         new OffheapBufferGenerator("result merging", 
config.intermediateComputeSizeBytes()),
         config.getNumMergeBuffers()
@@ -144,13 +144,13 @@ public class BrokerProcessingModule implements Module
     return poolProvider.getPool();
   }
 
-  private void verifyDirectMemory(DruidProcessingConfig config)
+  private void verifyDirectMemory(DruidProcessingConfig config, RuntimeInfo 
runtimeInfo)
   {
     final long memoryNeeded = (long) config.intermediateComputeSizeBytes() *
                               (config.getNumMergeBuffers() + 1);
 
     try {
-      final long maxDirectMemory = 
JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
+      final long maxDirectMemory = runtimeInfo.getDirectMemorySizeBytes();
 
       if (maxDirectMemory < memoryNeeded) {
         throw new ProvisionException(
diff --git 
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index 4879b5cd3c7..2cffbcfae4e 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -49,7 +49,7 @@ import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
 import org.apache.druid.server.metrics.MetricsModule;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
@@ -94,17 +94,17 @@ public class DruidProcessingModule implements Module
   @Provides
   @LazySingleton
   @Global
-  public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config)
+  public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config, RuntimeInfo 
runtimeInfo)
   {
-    return createIntermediateResultsPool(config);
+    return createIntermediateResultsPool(config, runtimeInfo);
   }
 
   @Provides
   @LazySingleton
   @Merging
-  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig 
config)
+  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig 
config, RuntimeInfo runtimeInfo)
   {
-    return createMergeBufferPool(config);
+    return createMergeBufferPool(config, runtimeInfo);
   }
 
   @Provides
@@ -161,9 +161,12 @@ public class DruidProcessingModule implements Module
     );
   }
 
-  public static NonBlockingPool<ByteBuffer> 
createIntermediateResultsPool(final DruidProcessingConfig config)
+  public static NonBlockingPool<ByteBuffer> createIntermediateResultsPool(
+      final DruidProcessingConfig config,
+      final RuntimeInfo runtimeInfo
+  )
   {
-    verifyDirectMemory(config);
+    verifyDirectMemory(config, runtimeInfo);
     return new StupidPool<>(
         "intermediate processing pool",
         new OffheapBufferGenerator("intermediate processing", 
config.intermediateComputeSizeBytes()),
@@ -172,19 +175,22 @@ public class DruidProcessingModule implements Module
     );
   }
 
-  public static BlockingPool<ByteBuffer> createMergeBufferPool(final 
DruidProcessingConfig config)
+  public static BlockingPool<ByteBuffer> createMergeBufferPool(
+      final DruidProcessingConfig config,
+      final RuntimeInfo runtimeInfo
+  )
   {
-    verifyDirectMemory(config);
+    verifyDirectMemory(config, runtimeInfo);
     return new DefaultBlockingPool<>(
         new OffheapBufferGenerator("result merging", 
config.intermediateComputeSizeBytes()),
         config.getNumMergeBuffers()
     );
   }
 
-  private static void verifyDirectMemory(DruidProcessingConfig config)
+  private static void verifyDirectMemory(final DruidProcessingConfig config, 
final RuntimeInfo runtimeInfo)
   {
     try {
-      final long maxDirectMemory = 
JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
+      final long maxDirectMemory = runtimeInfo.getDirectMemorySizeBytes();
       final long memoryNeeded = (long) config.intermediateComputeSizeBytes() *
                                 (config.getNumMergeBuffers() + 
config.getNumThreads() + 1);
 
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
index 2738c63e180..81fccf19f13 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
@@ -108,6 +108,12 @@ public class NoopOverlordClient implements OverlordClient
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public ListenableFuture<Map<String, String>> terminateSupervisor(String 
supervisorId)
+  {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public ListenableFuture<CloseableIterator<SupervisorStatus>> 
supervisorStatuses()
   {
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index 75d57fba6b3..c4d34899777 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -188,6 +188,15 @@ public interface OverlordClient
    */
   ListenableFuture<Map<String, String>> postSupervisor(SupervisorSpec 
supervisor);
 
+  /**
+   * Shuts down a supervisor.
+   * <p>
+   * API: {@code /druid/indexer/v1/supervisor/<id>/terminate}
+   *
+   * @return Map containing a single entry "id"
+   */
+  ListenableFuture<Map<String, String>> terminateSupervisor(String 
supervisorId);
+
   /**
    * Returns all current supervisor statuses.
    */
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
index 60a36ef4468..0499a62f090 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
@@ -248,6 +248,23 @@ public class OverlordClientImpl implements OverlordClient
     );
   }
 
+  @Override
+  public ListenableFuture<Map<String, String>> terminateSupervisor(String 
supervisorId)
+  {
+    final String path = StringUtils.format(
+        "/druid/indexer/v1/supervisor/%s/terminate",
+        StringUtils.urlEncode(supervisorId)
+    );
+
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.POST, path),
+            new BytesFullResponseHandler()
+        ),
+        holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new 
TypeReference<>() {})
+    );
+  }
+
   @Override
   public ListenableFuture<CloseableIterator<SupervisorStatus>> 
supervisorStatuses()
   {
diff --git a/server/src/main/java/org/apache/druid/server/StatusResource.java 
b/server/src/main/java/org/apache/druid/server/StatusResource.java
index cf0ed113c47..50730de91bc 100644
--- a/server/src/main/java/org/apache/druid/server/StatusResource.java
+++ b/server/src/main/java/org/apache/druid/server/StatusResource.java
@@ -30,7 +30,6 @@ import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.server.http.security.ConfigResourceFilter;
 import org.apache.druid.server.http.security.StateResourceFilter;
-import org.apache.druid.utils.JvmUtils;
 import org.apache.druid.utils.RuntimeInfo;
 
 import javax.annotation.Nonnull;
@@ -41,7 +40,6 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -59,17 +57,20 @@ public class StatusResource
   private final Properties properties;
   private final DruidServerConfig druidServerConfig;
   private final ExtensionsLoader extnLoader;
+  private final RuntimeInfo runtimeInfo;
 
   @Inject
   public StatusResource(
       final Properties properties,
       final DruidServerConfig druidServerConfig,
-      final ExtensionsLoader extnLoader
+      final ExtensionsLoader extnLoader,
+      final RuntimeInfo runtimeInfo
   )
   {
     this.properties = properties;
     this.druidServerConfig = druidServerConfig;
     this.extnLoader = extnLoader;
+    this.runtimeInfo = runtimeInfo;
   }
 
   @GET
@@ -115,7 +116,7 @@ public class StatusResource
       @Context final HttpServletRequest req
   )
   {
-    return new Status(extnLoader.getLoadedModules());
+    return new Status(extnLoader.getLoadedModules(), runtimeInfo);
   }
 
   /**
@@ -136,11 +137,11 @@ public class StatusResource
     final List<ModuleVersion> modules;
     final Memory memory;
 
-    public Status(Collection<DruidModule> modules)
+    public Status(Collection<DruidModule> modules, RuntimeInfo runtimeInfo)
     {
       this.version = getDruidVersion();
       this.modules = getExtensionVersions(modules);
-      this.memory = new Memory(JvmUtils.getRuntimeInfo());
+      this.memory = new Memory(runtimeInfo);
     }
 
     private String getDruidVersion()
diff --git 
a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java
 
b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java
index ddbc0790084..db9a4caa3cb 100644
--- 
a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java
+++ 
b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java
@@ -24,24 +24,27 @@ import com.google.inject.Provider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 public class SubqueryGuardrailHelperProvider implements 
Provider<SubqueryGuardrailHelper>
 {
   private final LookupExtractorFactoryContainerProvider 
lookupExtractorFactoryContainerProvider;
   private final ServerConfig serverConfig;
   private final QuerySchedulerConfig querySchedulerConfig;
+  private final RuntimeInfo runtimeInfo;
 
   @Inject
   public SubqueryGuardrailHelperProvider(
       LookupExtractorFactoryContainerProvider 
lookupExtractorFactoryContainerProvider,
       ServerConfig serverConfig,
-      QuerySchedulerConfig querySchedulerConfig
+      QuerySchedulerConfig querySchedulerConfig,
+      RuntimeInfo runtimeInfo
   )
   {
     this.lookupExtractorFactoryContainerProvider = 
lookupExtractorFactoryContainerProvider;
     this.serverConfig = serverConfig;
     this.querySchedulerConfig = querySchedulerConfig;
+    this.runtimeInfo = runtimeInfo;
   }
 
   @Override
@@ -61,7 +64,7 @@ public class SubqueryGuardrailHelperProvider implements 
Provider<SubqueryGuardra
 
     return new SubqueryGuardrailHelper(
         lookupExtractorFactoryContainerProvider,
-        JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
+        runtimeInfo.getMaxHeapSizeBytes(),
         maxConcurrentQueries
     );
   }
diff --git 
a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java 
b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
index a4bf2e28211..ad33eeb7e01 100644
--- 
a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
@@ -63,7 +63,7 @@ public class BrokerProcessingModuleTest
   public void testIntermediateResultsPool()
   {
     DruidProcessingConfig druidProcessingConfig = 
injector.getInstance(DruidProcessingConfig.class);
-    target.getIntermediateResultsPool(druidProcessingConfig);
+    target.getIntermediateResultsPool(druidProcessingConfig, 
JvmUtils.getRuntimeInfo());
   }
 
 
@@ -71,7 +71,7 @@ public class BrokerProcessingModuleTest
   public void testMergeBufferPool()
   {
     DruidProcessingConfig druidProcessingConfig = 
injector.getInstance(DruidProcessingConfig.class);
-    target.getMergeBufferPool(druidProcessingConfig);
+    target.getMergeBufferPool(druidProcessingConfig, 
JvmUtils.getRuntimeInfo());
   }
 
   @Test
@@ -106,7 +106,7 @@ public class BrokerProcessingModuleTest
 
     DruidProcessingConfig processingBufferConfig = 
injector1.getInstance(DruidProcessingConfig.class);
     BrokerProcessingModule module = new BrokerProcessingModule();
-    module.getMergeBufferPool(processingBufferConfig);
+    module.getMergeBufferPool(processingBufferConfig, 
JvmUtils.getRuntimeInfo());
   }
 
   private Injector makeInjector(Properties props)
diff --git 
a/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java 
b/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
index 129522d03f5..c95c5f56ea0 100644
--- a/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
@@ -41,20 +41,23 @@ public class DruidProcessingModuleTest
     }
 
     DruidProcessingModule module = new DruidProcessingModule();
-    module.getIntermediateResultsPool(new DruidProcessingConfig()
-    {
-      @Override
-      public String getFormatString()
-      {
-        return "test";
-      }
+    module.getIntermediateResultsPool(
+        new DruidProcessingConfig()
+        {
+          @Override
+          public String getFormatString()
+          {
+            return "test";
+          }
 
-      @Override
-      public int intermediateComputeSizeBytes()
-      {
-        return Integer.MAX_VALUE;
-      }
-    });
+          @Override
+          public int intermediateComputeSizeBytes()
+          {
+            return Integer.MAX_VALUE;
+          }
+        },
+        JvmUtils.getRuntimeInfo()
+    );
   }
 
   @Test
@@ -71,7 +74,7 @@ public class DruidProcessingModuleTest
 
     DruidProcessingModule module = new DruidProcessingModule();
     config.getNumInitalBuffersForIntermediatePool();
-    module.getIntermediateResultsPool(config);
+    module.getIntermediateResultsPool(config, JvmUtils.getRuntimeInfo());
   }
 }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/StatusResourceTest.java 
b/server/src/test/java/org/apache/druid/server/StatusResourceTest.java
index 72eac936dc1..eb96b769f73 100644
--- a/server/src/test/java/org/apache/druid/server/StatusResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/StatusResourceTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.StartupInjectorBuilder;
 import org.apache.druid.guice.TestDruidModule;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -46,7 +47,8 @@ public class StatusResourceTest
   {
 
     Collection<DruidModule> modules = ImmutableList.of(new TestDruidModule());
-    List<StatusResource.ModuleVersion> statusResourceModuleList = new 
StatusResource.Status(modules).getModules();
+    List<StatusResource.ModuleVersion> statusResourceModuleList =
+        new StatusResource.Status(modules, 
JvmUtils.getRuntimeInfo()).getModules();
 
     Assert.assertEquals("Status should have all modules loaded!", 
modules.size(), statusResourceModuleList.size());
 
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index 331fad36ac2..87a5b612147 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -99,6 +100,9 @@ public class LatchableEmitter extends StubServiceEmitter
 
   /**
    * Waits until an event that satisfies the given predicate is emitted.
+   *
+   * @param condition     condition to wait for
+   * @param timeoutMillis timeout, or negative to not use a timeout
    */
   public void waitForEvent(Predicate<Event> condition, long timeoutMillis)
   {
@@ -107,7 +111,8 @@ public class LatchableEmitter extends StubServiceEmitter
 
     triggerConditionEvaluations();
     try {
-      if (!waitCondition.countDownLatch.await(timeoutMillis, 
TimeUnit.MILLISECONDS)) {
+      final long awaitTime = timeoutMillis >= 0 ? timeoutMillis : 
Long.MAX_VALUE;
+      if (!waitCondition.countDownLatch.await(awaitTime, 
TimeUnit.MILLISECONDS)) {
         throw new ISE("Timed out waiting for event");
       }
     }
@@ -120,9 +125,7 @@ public class LatchableEmitter extends StubServiceEmitter
   }
 
   /**
-   * Waits until a metric event that matches the given condition is emitted.
-   *
-   * @throws ISE if a timeout of 30 seconds elapses waiting for the event.
+   * Wait indefinitely until a metric event that matches the given condition 
is emitted.
    */
   public ServiceMetricEvent waitForEvent(UnaryOperator<EventMatcher> condition)
   {
@@ -130,16 +133,14 @@ public class LatchableEmitter extends StubServiceEmitter
     waitForEvent(
         event -> event instanceof ServiceMetricEvent
                  && matcher.test((ServiceMetricEvent) event),
-        30_000
+        -1
     );
     return matcher.matchingEvent.get();
   }
 
   /**
-   * Waits until the overall aggregate of matching events satisfies the given
-   * criteria.
-   *
-   * @throws ISE if a timeout of 5 minutes elapses waiting for the aggregate.
+   * Waits indefinitely until the overall aggregate of matching events 
satisfies the given criteria.
+   * Use {@link Timeout} on the overall test case to get a timeout.
    */
   public void waitForEventAggregate(
       UnaryOperator<EventMatcher> condition,
diff --git a/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java 
b/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java
index e20d64da51e..b7ec16f93f7 100644
--- a/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java
+++ b/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java
@@ -31,7 +31,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.util.List;
 import java.util.Properties;
@@ -99,10 +99,11 @@ public abstract class GuiceRunnable implements Runnable
     try {
       final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
       final StartupLoggingConfig startupLoggingConfig = 
injector.getInstance(StartupLoggingConfig.class);
+      final RuntimeInfo runtimeInfo = injector.getInstance(RuntimeInfo.class);
 
       Long directSizeBytes = null;
       try {
-        directSizeBytes = JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
+        directSizeBytes = runtimeInfo.getDirectMemorySizeBytes();
       }
       catch (UnsupportedOperationException ignore) {
         // querying direct memory is not supported
@@ -110,9 +111,9 @@ public abstract class GuiceRunnable implements Runnable
 
       log.info(
           "Starting up with processors [%,d], memory [%,d], maxMemory [%,d]%s. 
Properties follow.",
-          JvmUtils.getRuntimeInfo().getAvailableProcessors(),
-          JvmUtils.getRuntimeInfo().getTotalHeapSizeBytes(),
-          JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
+          runtimeInfo.getAvailableProcessors(),
+          runtimeInfo.getTotalHeapSizeBytes(),
+          runtimeInfo.getMaxHeapSizeBytes(),
           directSizeBytes != null ? StringUtils.format(", directMemory [%,d]", 
directSizeBytes) : ""
       );
 
diff --git a/services/src/main/java/org/apache/druid/cli/Version.java 
b/services/src/main/java/org/apache/druid/cli/Version.java
index b5355b7f3ca..677c4d9da76 100644
--- a/services/src/main/java/org/apache/druid/cli/Version.java
+++ b/services/src/main/java/org/apache/druid/cli/Version.java
@@ -23,6 +23,7 @@ import com.github.rvesse.airline.annotations.Command;
 import io.netty.util.SuppressForbidden;
 import org.apache.druid.guice.ExtensionsLoader;
 import org.apache.druid.server.StatusResource;
+import org.apache.druid.utils.RuntimeInfo;
 
 import javax.inject.Inject;
 
@@ -35,6 +36,9 @@ public class Version implements Runnable
   @Inject
   private ExtensionsLoader extnLoader;
 
+  @Inject
+  private RuntimeInfo runtimeInfo;
+
   @Override
   @SuppressForbidden(reason = "System#out")
   public void run()
@@ -42,6 +46,6 @@ public class Version implements Runnable
     // Since Version is a command, we don't go through the server
     // process to load modules and they are thus not already loaded.
     // Explicitly load them here.
-    System.out.println(new StatusResource.Status(extnLoader.getModules()));
+    System.out.println(new StatusResource.Status(extnLoader.getModules(), 
runtimeInfo));
   }
 }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
index 05b15aafd57..4a82769cc36 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
@@ -27,8 +27,6 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.utils.JvmUtils;
-import org.apache.druid.utils.RuntimeInfo;
 
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -161,10 +159,6 @@ class DruidServerResource implements EmbeddedResource
       final Injector injector = new StartupInjectorBuilder()
           .withProperties(serverProperties)
           .withExtensions()
-          .add(binder -> {
-            binder.bind(RuntimeInfo.class).toInstance(server.getRuntimeInfo());
-            binder.requestStaticInjection(JvmUtils.class);
-          })
           .build();
 
       injector.injectMembers(runnable);
@@ -182,4 +176,12 @@ class DruidServerResource implements EmbeddedResource
       log.info("Stopped server[%s].", server.getName());
     }
   }
+
+  @Override
+  public String toString()
+  {
+    return "DruidServerResource{" +
+           "server=" + server.getName() +
+           '}';
+  }
 }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 4bb1dac6d1e..254a09850e4 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -31,6 +31,7 @@ import org.apache.druid.server.metrics.LatchableEmitter;
 import org.apache.druid.testing.embedded.derby.InMemoryDerbyModule;
 import org.apache.druid.testing.embedded.derby.InMemoryDerbyResource;
 import org.apache.druid.testing.embedded.emitter.LatchableEmitterModule;
+import org.apache.druid.utils.RuntimeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,6 +51,8 @@ import java.util.stream.Collectors;
  * <li>Other {@link EmbeddedResource} to be used in the cluster. For example,
  * an {@link InMemoryDerbyResource}.</li>
  * <li>List of {@link DruidModule} to load specific extensions, e.g. {@link 
InMemoryDerbyModule}.</li>
+ * <li>{@link RuntimeInfoModule} supplying a {@link RuntimeInfo} with values 
matching
+ * {@link EmbeddedDruidServer#setServerMemory} and {@link 
EmbeddedDruidServer#setServerDirectMemory}</li>
  * <li>{@link #addCommonProperty Common properties} that are applied to all 
Druid
  * services in the cluster.</li>
  * </ul>
@@ -86,6 +89,7 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
   {
     resources.add(testFolder);
     clusterApis = new EmbeddedClusterApis(this);
+    addExtension(RuntimeInfoModule.class);
   }
 
   /**
@@ -242,6 +246,7 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
           startedFirstDruidServer = true;
         }
 
+        log.info("Starting resource[%s].", resource);
         resource.start();
         resource.onStarted(this);
       }
@@ -263,6 +268,7 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
     // Stop the resources in reverse order
     for (EmbeddedResource resource : Lists.reverse(resources)) {
       try {
+        log.info("Stopping resource[%s].", resource);
         resource.stop();
       }
       catch (Exception e) {
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
index 3c0a8f99de1..89ecdd2ac19 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
@@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.server.metrics.LatchableEmitter;
 import org.apache.druid.utils.RuntimeInfo;
 
@@ -55,6 +54,8 @@ public abstract class EmbeddedDruidServer<T extends 
EmbeddedDruidServer<T>> impl
   private final String name;
   private final AtomicReference<DruidServerResource> lifecycle = new 
AtomicReference<>();
 
+  private long serverMemory = MEM_100_MB;
+  private long serverDirectMemory = MEM_100_MB;
   private final Map<String, String> serverProperties = new HashMap<>();
   private final ServerReferenceHolder referenceHolder = new 
ServerReferenceHolder();
 
@@ -109,6 +110,24 @@ public abstract class EmbeddedDruidServer<T extends 
EmbeddedDruidServer<T>> impl
     return (T) this;
   }
 
+  /**
+   * Sets the amount of heap memory visible to the server through {@link 
RuntimeInfo}.
+   */
+  public final EmbeddedDruidServer setServerMemory(long serverMemory)
+  {
+    this.serverMemory = serverMemory;
+    return this;
+  }
+
+  /**
+   * Sets the amount of direct (off-heap) memory visible to the server through 
{@link RuntimeInfo}.
+   */
+  public final EmbeddedDruidServer setServerDirectMemory(long 
serverDirectMemory)
+  {
+    this.serverDirectMemory = serverDirectMemory;
+    return this;
+  }
+
   /**
    * Called from {@link EmbeddedDruidCluster#addServer(EmbeddedDruidServer)} to
    * tie the lifecycle of this server to the cluster.
@@ -136,16 +155,6 @@ public abstract class EmbeddedDruidServer<T extends 
EmbeddedDruidServer<T>> impl
       LifecycleInitHandler handler
   );
 
-  /**
-   * {@link RuntimeInfo} to use for this server.
-   *
-   * @return {@link RuntimeInfo} with 2 processors and 100MB memory by default.
-   */
-  RuntimeInfo getRuntimeInfo()
-  {
-    return new DruidProcessingConfigTest.MockRuntimeInfo(2, MEM_100_MB, 
MEM_100_MB);
-  }
-
   /**
    * Properties to be used in the {@code StartupInjectorBuilder} while 
launching
    * this server. This must be called only after all the resources required by
@@ -186,6 +195,10 @@ public abstract class EmbeddedDruidServer<T extends 
EmbeddedDruidServer<T>> impl
       );
     }
 
+    // Add properties for RuntimeInfoModule
+    serverProperties.setProperty(RuntimeInfoModule.SERVER_MEMORY_PROPERTY, 
String.valueOf(serverMemory));
+    
serverProperties.setProperty(RuntimeInfoModule.SERVER_DIRECT_MEMORY_PROPERTY, 
String.valueOf(serverDirectMemory));
+
     serverProperties.putAll(this.serverProperties);
     return serverProperties;
   }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
index b2748c5bbb1..3b7b755e052 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
@@ -24,8 +24,6 @@ import com.google.inject.Module;
 import org.apache.druid.cli.CliOverlord;
 import org.apache.druid.cli.ServerRunnable;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
-import org.apache.druid.query.DruidProcessingConfigTest;
-import org.apache.druid.utils.RuntimeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,6 +33,8 @@ import java.util.List;
  */
 public class EmbeddedOverlord extends EmbeddedDruidServer<EmbeddedOverlord>
 {
+  private static final long MEM_1000_MB = 1_000_000_000;
+
   public EmbeddedOverlord()
   {
     addProperty("druid.indexer.storage.type", "metadata");
@@ -44,6 +44,8 @@ public class EmbeddedOverlord extends 
EmbeddedDruidServer<EmbeddedOverlord>
     // Keep a small sync timeout so that Peons and Indexers are not stuck
     // handling a change request when Overlord has already shutdown
     addProperty("druid.indexer.runner.syncRequestTimeout", "PT1S");
+
+    setServerMemory(MEM_1000_MB);
   }
 
   @Override
@@ -52,13 +54,6 @@ public class EmbeddedOverlord extends 
EmbeddedDruidServer<EmbeddedOverlord>
     return new Overlord(handler);
   }
 
-  @Override
-  RuntimeInfo getRuntimeInfo()
-  {
-    final long mem1gb = 1_000_000_000;
-    return new DruidProcessingConfigTest.MockRuntimeInfo(4, mem1gb, mem1gb);
-  }
-
   /**
    * Extends {@link CliOverlord} to get the server lifecycle.
    */
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedZookeeper.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedZookeeper.java
index 9c26a464d21..89b6544aa4f 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedZookeeper.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedZookeeper.java
@@ -50,4 +50,10 @@ public class EmbeddedZookeeper implements EmbeddedResource
   {
     return zk.getConnectString();
   }
+
+  @Override
+  public String toString()
+  {
+    return "EmbeddedZookeeper";
+  }
 }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/RuntimeInfoModule.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/RuntimeInfoModule.java
new file mode 100644
index 00000000000..079a2fae518
--- /dev/null
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/RuntimeInfoModule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.utils.RuntimeInfo;
+
+import java.util.Properties;
+
+/**
+ * Module for providing overridden {@link RuntimeInfo}, based on {@link 
EmbeddedDruidServer#setServerMemory(long)}
+ * and {@link EmbeddedDruidServer#setServerDirectMemory(long)}.
+ */
+public class RuntimeInfoModule implements DruidModule
+{
+  public static final String SERVER_MEMORY_PROPERTY = 
"druid.testing.embedded.serverMemory";
+  public static final String SERVER_DIRECT_MEMORY_PROPERTY = 
"druid.testing.embedded.serverDirectMemory";
+  private static final int NUM_PROCESSORS = 2;
+
+  private Properties properties;
+
+  @Inject
+  public void setProperties(Properties properties)
+  {
+    this.properties = properties;
+  }
+
+  @Override
+  public void configure(final Binder binder)
+  {
+    final long serverMemory = 
Long.parseLong(properties.getProperty(SERVER_MEMORY_PROPERTY));
+    final long serverDirectMemory = 
Long.parseLong(properties.getProperty(SERVER_DIRECT_MEMORY_PROPERTY));
+    final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo =
+        new DruidProcessingConfigTest.MockRuntimeInfo(
+            NUM_PROCESSORS,
+            serverDirectMemory,
+            serverMemory
+        );
+    binder.bind(RuntimeInfo.class).toInstance(runtimeInfo);
+  }
+}
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/TestFolder.java 
b/services/src/test/java/org/apache/druid/testing/embedded/TestFolder.java
index 2f4a4dd5605..2ca944f26c3 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/TestFolder.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/TestFolder.java
@@ -65,6 +65,14 @@ public class TestFolder implements EmbeddedResource
     }
   }
 
+  @Override
+  public String toString()
+  {
+    return "TestFolder{" +
+           "rootFolder=" + rootFolder +
+           '}';
+  }
+
   private void validateRootFolderInitialized()
   {
     if (rootFolder == null) {
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/derby/InMemoryDerbyResource.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/derby/InMemoryDerbyResource.java
index 036b0c7d104..3a2366da23f 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/derby/InMemoryDerbyResource.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/derby/InMemoryDerbyResource.java
@@ -55,4 +55,10 @@ public class InMemoryDerbyResource implements 
EmbeddedResource
     cluster.addCommonProperty("druid.metadata.storage.tables.base", 
connector.getMetadataTablesConfig().getBase());
     cluster.addCommonProperty("druid.metadata.storage.connector.connectURI", 
connector.getJdbcUri());
   }
+
+  @Override
+  public String toString()
+  {
+    return "InMemoryDerbyResource";
+  }
 }
diff --git 
a/services/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/services/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 3be2232f857..de9fee3bb5d 100644
--- 
a/services/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ 
b/services/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -19,3 +19,4 @@
 
 org.apache.druid.testing.embedded.derby.InMemoryDerbyModule
 org.apache.druid.testing.embedded.emitter.LatchableEmitterModule
+org.apache.druid.testing.embedded.RuntimeInfoModule


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to