This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a2a6981b359 always use StorageMonitor, also make StorageMonitor work 
for MSQ tasks (#19048)
a2a6981b359 is described below

commit a2a6981b359e9557f0ede1667ed4aacb65544bc7
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Mar 3 13:51:14 2026 -0800

    always use StorageMonitor, also make StorageMonitor work for MSQ tasks 
(#19048)
---
 .../embedded/compact/CompactionSupervisorTest.java    | 17 +++++++++++++++++
 .../embedded/query/QueryVirtualStorageTest.java       |  4 ----
 .../druid/msq/indexing/IndexerWorkerContext.java      | 15 ++++++++++++---
 .../org/apache/druid/guice/StorageNodeModule.java     | 15 +++++++++++++++
 .../apache/druid/server/metrics/StorageMonitor.java   | 19 ++++++++++---------
 5 files changed, 54 insertions(+), 16 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index e09be28eb51..37c28ee61bb 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.EqualityFilter;
@@ -64,6 +65,8 @@ import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfi
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.metrics.StorageMonitor;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -417,6 +420,11 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     );
 
     runCompactionWithSpec(cascadingTemplate);
+
+    // vsf storage monitor metrics are only emitted for MSQ ingestion, so 
picked this compaction test at random to test
+    LatchableEmitter emitter = indexer.latchableEmitter();
+    emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+
     waitForAllCompactionTasksToFinish();
 
     cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
@@ -426,6 +434,15 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
 
     // Verify the correct rows were filtered
     verifyNoRowsWithNestedValue("extraInfo", "fieldA", "valueA");
+
+    List<ServiceMetricEvent> events = 
emitter.getMetricEvents(StorageMonitor.VSF_LOAD_COUNT);
+    long count = 0;
+    for (ServiceMetricEvent event : events) {
+      count += event.getValue().longValue();
+      Assertions.assertNotNull(event.getUserDims().get("taskId"));
+      Assertions.assertNotNull(event.getUserDims().get("groupId"));
+    }
+    Assertions.assertTrue(count > 0);
   }
 
   /**
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index ea16aebb4ff..a55e3859fd6 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -123,10 +123,6 @@ class QueryVirtualStorageTest extends 
EmbeddedClusterTestBase
         .addCommonProperty("druid.storage.zip", "false")
         .addCommonProperty("druid.indexer.task.buildV10", "true")
         .addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
-        .addCommonProperty(
-            "druid.monitoring.monitors",
-            "[\"org.apache.druid.server.metrics.StorageMonitor\"]"
-        )
         .addServer(coordinator)
         .addServer(overlord)
         .addServer(indexer)
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 22f3598379c..cf76c3f9041 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -58,8 +58,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
 import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.StorageMonitor;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.StorageConnectorProvider;
 
@@ -82,6 +84,7 @@ public class IndexerWorkerContext implements WorkerContext
   private final ServiceLocator controllerLocator;
   private final IndexIO indexIO;
   private final SegmentManager segmentManager;
+  private final StorageMonitor storageMonitor;
   @Nullable
   private final CoordinatorClient coordinatorClient;
   private final IndexerDataServerQueryHandlerFactory 
dataServerQueryHandlerFactory;
@@ -104,6 +107,7 @@ public class IndexerWorkerContext implements WorkerContext
       final ServiceLocator controllerLocator,
       final IndexIO indexIO,
       final SegmentManager segmentManager,
+      final StorageMonitor storageMonitor,
       @Nullable final CoordinatorClient coordinatorClient,
       final ServiceClientFactory clientFactory,
       final MemoryIntrospector memoryIntrospector,
@@ -117,6 +121,7 @@ public class IndexerWorkerContext implements WorkerContext
     this.controllerLocator = controllerLocator;
     this.indexIO = indexIO;
     this.segmentManager = segmentManager;
+    this.storageMonitor = storageMonitor;
     this.coordinatorClient = coordinatorClient;
     this.clientFactory = clientFactory;
     this.memoryIntrospector = memoryIntrospector;
@@ -152,10 +157,12 @@ public class IndexerWorkerContext implements WorkerContext
   )
   {
     final IndexIO indexIO = injector.getInstance(IndexIO.class);
-    final SegmentManager segmentManager = new SegmentManager(
+    final SegmentCacheManager cacheManager =
         injector.getInstance(SegmentCacheManagerFactory.class)
-                .manufacturate(new File(toolbox.getIndexingTmpDir(), 
"segment-fetch"), true)
-    );
+                .manufacturate(new File(toolbox.getIndexingTmpDir(), 
"segment-fetch"), true);
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final StorageMonitor storageMonitor = new StorageMonitor(cacheManager, 
task::getMetricBuilder);
+    toolbox.addMonitor(storageMonitor);
     final ServiceClientFactory serviceClientFactory =
         injector.getInstance(Key.get(ServiceClientFactory.class, 
EscalatedGlobal.class));
     final MemoryIntrospector memoryIntrospector = 
injector.getInstance(MemoryIntrospector.class);
@@ -173,6 +180,7 @@ public class IndexerWorkerContext implements WorkerContext
         new SpecificTaskServiceLocator(task.getControllerTaskId(), 
overlordClient),
         indexIO,
         segmentManager,
+        storageMonitor,
         toolbox.getCoordinatorClient(),
         serviceClientFactory,
         memoryIntrospector,
@@ -330,6 +338,7 @@ public class IndexerWorkerContext implements WorkerContext
   @Override
   public void close()
   {
+    toolbox.removeMonitor(storageMonitor);
     controllerLocator.close();
 
     synchronized (this) {
diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java 
b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
index 3f12752f059..e030cd8cd2a 100644
--- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
+++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
@@ -21,6 +21,7 @@ package org.apache.druid.guice;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Binder;
+import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.ProvisionException;
@@ -33,12 +34,15 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.segment.DefaultColumnFormatConfig;
 import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocation;
 import org.apache.druid.segment.loading.StorageLocationSelectorStrategy;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.MetricsModule;
+import org.apache.druid.server.metrics.StorageMonitor;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -60,6 +64,7 @@ public class StorageNodeModule implements Module
     bindLocationSelectorStrategy(binder);
     binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null));
     
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class).in(LazySingleton.class);
+    MetricsModule.register(binder, StorageMonitor.class);
   }
 
   @Provides
@@ -136,6 +141,16 @@ public class StorageNodeModule implements Module
     return config.toStorageLocations();
   }
 
+  @Provides
+  @LazySingleton
+  @Nullable
+  public StorageMonitor provideStorageMonitor(
+      Injector injector
+  )
+  {
+    return new StorageMonitor(injector.getInstance(SegmentCacheManager.class), 
null);
+  }
+
   /**
    * a helper method for both storage module and independent unit test cases
    */
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java 
b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
index 7a45bdbfd3d..6ddbd6f587b 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.server.metrics;
 
-import com.google.inject.Inject;
+import com.google.common.base.Supplier;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -30,6 +30,7 @@ import org.apache.druid.segment.loading.StorageLocationStats;
 import org.apache.druid.segment.loading.StorageStats;
 import org.apache.druid.segment.loading.VirtualStorageLocationStats;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -59,13 +60,15 @@ public class StorageMonitor extends AbstractMonitor
   public static final String VSF_REJECT_COUNT = "storage/virtual/reject/count";
 
   private final SegmentCacheManager cacheManager;
+  private final Supplier<ServiceMetricEvent.Builder> builderSupplier;
 
-  @Inject
   public StorageMonitor(
-      SegmentCacheManager cacheManager
+      SegmentCacheManager cacheManager,
+      @Nullable Supplier<ServiceMetricEvent.Builder> builderSupplier
   )
   {
     this.cacheManager = cacheManager;
+    this.builderSupplier = builderSupplier == null ? 
ServiceMetricEvent.Builder::new : builderSupplier;
   }
 
   @Override
@@ -76,8 +79,8 @@ public class StorageMonitor extends AbstractMonitor
     if (stats != null) {
       for (Map.Entry<String, StorageLocationStats> location : 
stats.getLocationStats().entrySet()) {
         final StorageLocationStats staticStats = location.getValue();
-        final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder()
-            .setDimension(LOCATION_DIMENSION, location.getKey());
+        final ServiceMetricEvent.Builder builder = builderSupplier.get()
+                                                                  
.setDimension(LOCATION_DIMENSION, location.getKey());
         emitter.emit(builder.setMetric(USED_BYTES, 
staticStats.getUsedBytes()));
         emitter.emit(builder.setMetric(LOAD_COUNT, 
staticStats.getLoadCount()));
         emitter.emit(builder.setMetric(LOAD_BYTES, 
staticStats.getLoadBytes()));
@@ -87,10 +90,8 @@ public class StorageMonitor extends AbstractMonitor
 
       for (Map.Entry<String, VirtualStorageLocationStats> location : 
stats.getVirtualLocationStats().entrySet()) {
         final VirtualStorageLocationStats weakStats = location.getValue();
-        final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder().setDimension(
-            LOCATION_DIMENSION,
-            location.getKey()
-        );
+        final ServiceMetricEvent.Builder builder = builderSupplier.get()
+                                                                  
.setDimension(LOCATION_DIMENSION, location.getKey());
         emitter.emit(builder.setMetric(VSF_USED_BYTES, 
weakStats.getUsedBytes()));
         emitter.emit(builder.setMetric(VSF_HIT_COUNT, 
weakStats.getHitCount()));
         emitter.emit(builder.setMetric(VSF_HIT_BYTES, 
weakStats.getHitBytes()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to