This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a2a6981b359 always use StorageMonitor, also make StorageMonitor work
for MSQ tasks (#19048)
a2a6981b359 is described below
commit a2a6981b359e9557f0ede1667ed4aacb65544bc7
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Mar 3 13:51:14 2026 -0800
always use StorageMonitor, also make StorageMonitor work for MSQ tasks
(#19048)
---
.../embedded/compact/CompactionSupervisorTest.java | 17 +++++++++++++++++
.../embedded/query/QueryVirtualStorageTest.java | 4 ----
.../druid/msq/indexing/IndexerWorkerContext.java | 15 ++++++++++++---
.../org/apache/druid/guice/StorageNodeModule.java | 15 +++++++++++++++
.../apache/druid/server/metrics/StorageMonitor.java | 19 ++++++++++---------
5 files changed, 54 insertions(+), 16 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index e09be28eb51..37c28ee61bb 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.EqualityFilter;
@@ -64,6 +65,8 @@ import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfi
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -417,6 +420,11 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
);
runCompactionWithSpec(cascadingTemplate);
+
+ // vsf storage monitor metrics are only emitted for MSQ ingestion, so
picked this compaction test at random to test
+ LatchableEmitter emitter = indexer.latchableEmitter();
+ emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+
waitForAllCompactionTasksToFinish();
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
@@ -426,6 +434,15 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
// Verify the correct rows were filtered
verifyNoRowsWithNestedValue("extraInfo", "fieldA", "valueA");
+
+ List<ServiceMetricEvent> events =
emitter.getMetricEvents(StorageMonitor.VSF_LOAD_COUNT);
+ long count = 0;
+ for (ServiceMetricEvent event : events) {
+ count += event.getValue().longValue();
+ Assertions.assertNotNull(event.getUserDims().get("taskId"));
+ Assertions.assertNotNull(event.getUserDims().get("groupId"));
+ }
+ Assertions.assertTrue(count > 0);
}
/**
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index ea16aebb4ff..a55e3859fd6 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -123,10 +123,6 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
.addCommonProperty("druid.storage.zip", "false")
.addCommonProperty("druid.indexer.task.buildV10", "true")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
- .addCommonProperty(
- "druid.monitoring.monitors",
- "[\"org.apache.druid.server.metrics.StorageMonitor\"]"
- )
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 22f3598379c..cf76c3f9041 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -58,8 +58,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator;
import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
@@ -82,6 +84,7 @@ public class IndexerWorkerContext implements WorkerContext
private final ServiceLocator controllerLocator;
private final IndexIO indexIO;
private final SegmentManager segmentManager;
+ private final StorageMonitor storageMonitor;
@Nullable
private final CoordinatorClient coordinatorClient;
private final IndexerDataServerQueryHandlerFactory
dataServerQueryHandlerFactory;
@@ -104,6 +107,7 @@ public class IndexerWorkerContext implements WorkerContext
final ServiceLocator controllerLocator,
final IndexIO indexIO,
final SegmentManager segmentManager,
+ final StorageMonitor storageMonitor,
@Nullable final CoordinatorClient coordinatorClient,
final ServiceClientFactory clientFactory,
final MemoryIntrospector memoryIntrospector,
@@ -117,6 +121,7 @@ public class IndexerWorkerContext implements WorkerContext
this.controllerLocator = controllerLocator;
this.indexIO = indexIO;
this.segmentManager = segmentManager;
+ this.storageMonitor = storageMonitor;
this.coordinatorClient = coordinatorClient;
this.clientFactory = clientFactory;
this.memoryIntrospector = memoryIntrospector;
@@ -152,10 +157,12 @@ public class IndexerWorkerContext implements WorkerContext
)
{
final IndexIO indexIO = injector.getInstance(IndexIO.class);
- final SegmentManager segmentManager = new SegmentManager(
+ final SegmentCacheManager cacheManager =
injector.getInstance(SegmentCacheManagerFactory.class)
- .manufacturate(new File(toolbox.getIndexingTmpDir(),
"segment-fetch"), true)
- );
+ .manufacturate(new File(toolbox.getIndexingTmpDir(),
"segment-fetch"), true);
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+ final StorageMonitor storageMonitor = new StorageMonitor(cacheManager,
task::getMetricBuilder);
+ toolbox.addMonitor(storageMonitor);
final ServiceClientFactory serviceClientFactory =
injector.getInstance(Key.get(ServiceClientFactory.class,
EscalatedGlobal.class));
final MemoryIntrospector memoryIntrospector =
injector.getInstance(MemoryIntrospector.class);
@@ -173,6 +180,7 @@ public class IndexerWorkerContext implements WorkerContext
new SpecificTaskServiceLocator(task.getControllerTaskId(),
overlordClient),
indexIO,
segmentManager,
+ storageMonitor,
toolbox.getCoordinatorClient(),
serviceClientFactory,
memoryIntrospector,
@@ -330,6 +338,7 @@ public class IndexerWorkerContext implements WorkerContext
@Override
public void close()
{
+ toolbox.removeMonitor(storageMonitor);
controllerLocator.close();
synchronized (this) {
diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
index 3f12752f059..e030cd8cd2a 100644
--- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
+++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
@@ -21,6 +21,7 @@ package org.apache.druid.guice;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
+import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
@@ -33,12 +34,15 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.segment.DefaultColumnFormatConfig;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.segment.loading.StorageLocationSelectorStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.MetricsModule;
+import org.apache.druid.server.metrics.StorageMonitor;
import javax.annotation.Nullable;
import java.util.List;
@@ -60,6 +64,7 @@ public class StorageNodeModule implements Module
bindLocationSelectorStrategy(binder);
binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null));
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class).in(LazySingleton.class);
+ MetricsModule.register(binder, StorageMonitor.class);
}
@Provides
@@ -136,6 +141,16 @@ public class StorageNodeModule implements Module
return config.toStorageLocations();
}
+ @Provides
+ @LazySingleton
+ @Nullable
+ public StorageMonitor provideStorageMonitor(
+ Injector injector
+ )
+ {
+ return new StorageMonitor(injector.getInstance(SegmentCacheManager.class),
null);
+ }
+
/**
* a helper method for both storage module and independent unit test cases
*/
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
index 7a45bdbfd3d..6ddbd6f587b 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
@@ -19,7 +19,7 @@
package org.apache.druid.server.metrics;
-import com.google.inject.Inject;
+import com.google.common.base.Supplier;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -30,6 +30,7 @@ import org.apache.druid.segment.loading.StorageLocationStats;
import org.apache.druid.segment.loading.StorageStats;
import org.apache.druid.segment.loading.VirtualStorageLocationStats;
+import javax.annotation.Nullable;
import java.util.Map;
/**
@@ -59,13 +60,15 @@ public class StorageMonitor extends AbstractMonitor
public static final String VSF_REJECT_COUNT = "storage/virtual/reject/count";
private final SegmentCacheManager cacheManager;
+ private final Supplier<ServiceMetricEvent.Builder> builderSupplier;
- @Inject
public StorageMonitor(
- SegmentCacheManager cacheManager
+ SegmentCacheManager cacheManager,
+ @Nullable Supplier<ServiceMetricEvent.Builder> builderSupplier
)
{
this.cacheManager = cacheManager;
+ this.builderSupplier = builderSupplier == null ?
ServiceMetricEvent.Builder::new : builderSupplier;
}
@Override
@@ -76,8 +79,8 @@ public class StorageMonitor extends AbstractMonitor
if (stats != null) {
for (Map.Entry<String, StorageLocationStats> location :
stats.getLocationStats().entrySet()) {
final StorageLocationStats staticStats = location.getValue();
- final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder()
- .setDimension(LOCATION_DIMENSION, location.getKey());
+ final ServiceMetricEvent.Builder builder = builderSupplier.get()
+
.setDimension(LOCATION_DIMENSION, location.getKey());
emitter.emit(builder.setMetric(USED_BYTES,
staticStats.getUsedBytes()));
emitter.emit(builder.setMetric(LOAD_COUNT,
staticStats.getLoadCount()));
emitter.emit(builder.setMetric(LOAD_BYTES,
staticStats.getLoadBytes()));
@@ -87,10 +90,8 @@ public class StorageMonitor extends AbstractMonitor
for (Map.Entry<String, VirtualStorageLocationStats> location :
stats.getVirtualLocationStats().entrySet()) {
final VirtualStorageLocationStats weakStats = location.getValue();
- final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder().setDimension(
- LOCATION_DIMENSION,
- location.getKey()
- );
+ final ServiceMetricEvent.Builder builder = builderSupplier.get()
+
.setDimension(LOCATION_DIMENSION, location.getKey());
emitter.emit(builder.setMetric(VSF_USED_BYTES,
weakStats.getUsedBytes()));
emitter.emit(builder.setMetric(VSF_HIT_COUNT,
weakStats.getHitCount()));
emitter.emit(builder.setMetric(VSF_HIT_BYTES,
weakStats.getHitBytes()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]