This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 56262132981 add StorageMonitor to measure storage and virtual storage 
usage by segment cache (#18742)
56262132981 is described below

commit 562621329812d6517bd1282eebaf1edaf49ec210
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Dec 2 15:24:45 2025 -0800

    add StorageMonitor to measure storage and virtual storage usage by segment 
cache (#18742)
    
    changes:
    * adds `StorageLocationStats` and `VirtualStorageLocationStats` to abstract 
measuring segment cache/virtual segment cache usage
    * adds `StorageStats` to collect `StorageLocationStats` and 
`VirtualStorageLocationStats` into maps between location label and the stats of 
that location
    * adds new method `SegmentCacheManager.getStorageStats` to expose stats 
collection for segment cache manager implementations
    * adds new `StorageMonitor` to emit metrics for values in `StorageStats`
    * metrics for `StorageLocationStats`:
      - storage/used/bytes
      - storage/load/count
      - storage/load/bytes
      - storage/drop/count
      - storage/drop/bytes
    * metrics for `VirtualStorageLocationStats`:
      - storage/virtual/used/bytes
      - storage/virtual/hit/count
      - storage/virtual/load/count
      - storage/virtual/load/bytes
      - storage/virtual/evict/count
      - storage/virtual/evict/bytes
      - storage/virtual/reject/count
---
 .../embedded/query/QueryVirtualStorageTest.java    | 122 +++++++++-----
 .../java/util/metrics/StubServiceEmitter.java      |  40 ++++-
 .../druid/segment/loading/SegmentCacheManager.java |   6 +
 .../segment/loading/SegmentLocalCacheManager.java  |  27 +++
 .../druid/segment/loading/StorageLocation.java     | 183 ++++++++++++++++-----
 .../segment/loading/StorageLocationStats.java      |  48 ++++++
 .../apache/druid/segment/loading/StorageStats.java |  64 +++++++
 .../loading/VirtualStorageLocationStats.java       |  59 +++++++
 .../druid/server/metrics/StorageMonitor.java       | 104 ++++++++++++
 .../segment/loading/NoopSegmentCacheManager.java   |   8 +
 .../SegmentLocalCacheManagerConcurrencyTest.java   |  66 ++++----
 .../druid/server/metrics/LatchableEmitter.java     |  61 ++++++-
 12 files changed, 669 insertions(+), 119 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index 2c53b80ef4d..a5489041b80 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.query.DefaultQueryMetrics;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.metrics.StorageMonitor;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -51,7 +52,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -102,6 +102,11 @@ class QueryVirtualStorageTest extends 
EmbeddedClusterTestBase
         .useDefaultTimeoutForLatchableEmitter(20)
         .addResource(storageResource)
         .addCommonProperty("druid.storage.zip", "false")
+        .addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
+        .addCommonProperty(
+            "druid.monitoring.monitors",
+            "[\"org.apache.druid.server.metrics.StorageMonitor\"]"
+        )
         .addServer(coordinator)
         .addServer(overlord)
         .addServer(indexer)
@@ -152,90 +157,123 @@ class QueryVirtualStorageTest extends 
EmbeddedClusterTestBase
         "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 
14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'",
         "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 
19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'"
     };
-    final long[] expectedResults = new long[] {
-        9770,
-        10524,
-        10267,
-        8683
-    };
+    final long[] expectedResults = new long[]{9770, 10524, 10267, 8683};
+    final long[] expectedLoads = new long[]{8L, 6L, 5L, 5L};
+
+
+    LatchableEmitter emitter = historical.latchableEmitter();
+    // clear out the pipe to get zerod out storage monitor metrics
+    ServiceMetricEvent monitorEvent = emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+    while (monitorEvent != null && monitorEvent.getValue().longValue() > 0) {
+      monitorEvent = emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+    }
+    // then flush (which clears out the internal events stores in test 
emitter) so we can do clean sums across them
+    emitter.flush();
+
+    emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+    long beforeLoads = 
emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
+    // confirm flushed
+    Assertions.assertEquals(0, beforeLoads);
 
+    // run the queries in order
     Assertions.assertEquals(expectedResults[0], 
Long.parseLong(cluster.runSql(queries[0], dataSource)));
-    assertMetrics(1, 8L);
+    assertQueryMetrics(1, expectedLoads[0]);
     Assertions.assertEquals(expectedResults[1], 
Long.parseLong(cluster.runSql(queries[1], dataSource)));
-    assertMetrics(2, 6L);
+    assertQueryMetrics(2, expectedLoads[1]);
     Assertions.assertEquals(expectedResults[2], 
Long.parseLong(cluster.runSql(queries[2], dataSource)));
-    assertMetrics(3, 5L);
+    assertQueryMetrics(3, expectedLoads[2]);
     Assertions.assertEquals(expectedResults[3], 
Long.parseLong(cluster.runSql(queries[3], dataSource)));
-    assertMetrics(4, 5L);
+    assertQueryMetrics(4, expectedLoads[3]);
 
+    emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+    long firstLoads = 
emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
+    Assertions.assertTrue(firstLoads >= 24, "expected " + 24 + " but only got 
" + firstLoads);
+
+    long expectedTotalHits = 0;
+    long expectedTotalLoad = 0;
     for (int i = 0; i < 1000; i++) {
       int nextQuery = ThreadLocalRandom.current().nextInt(queries.length);
       Assertions.assertEquals(expectedResults[nextQuery], 
Long.parseLong(cluster.runSql(queries[nextQuery], dataSource)));
-      assertMetrics(i + 5, null);
+      assertQueryMetrics(i + 5, null);
+      long actualLoads = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, i + 5);
+      expectedTotalLoad += actualLoads;
+      expectedTotalHits += (expectedLoads[nextQuery] - actualLoads);
     }
+
+    emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_HIT_COUNT));
+    long hits = emitter.getMetricEventLongSum(StorageMonitor.VSF_HIT_COUNT);
+    Assertions.assertTrue(hits >= expectedTotalHits, "expected " + 
expectedTotalHits + " but only got " + hits);
+    emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+    long loads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
+    Assertions.assertTrue(loads >= expectedTotalLoad, "expected " + 
expectedTotalLoad + " but only got " + loads);
+    
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_BYTES)
 > 0);
+    emitter.waitForNextEvent(event -> 
event.hasMetricName(StorageMonitor.VSF_EVICT_COUNT));
+    
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_EVICT_COUNT)
 >= 0);
+    
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_EVICT_BYTES)
 > 0);
+    Assertions.assertEquals(0, 
emitter.getMetricEventLongSum(StorageMonitor.VSF_REJECT_COUNT));
+    
Assertions.assertTrue(emitter.getLatestMetricEventValue(StorageMonitor.VSF_USED_BYTES,
 0).longValue() > 0);
   }
 
-  private void assertMetrics(int expectedEventCount, @Nullable Long 
expectedLoadCount)
+
+  private void assertQueryMetrics(int expectedEventCount, @Nullable Long 
expectedLoadCount)
   {
     LatchableEmitter emitter = historical.latchableEmitter();
-    final int lastIndex = expectedEventCount - 1;
 
-    List<ServiceMetricEvent> countEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT);
-    Assertions.assertEquals(expectedEventCount, countEvents.size());
+    long loadCount = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, expectedEventCount);
     if (expectedLoadCount != null) {
-      Assertions.assertEquals(expectedLoadCount, 
countEvents.get(lastIndex).getValue());
+      Assertions.assertEquals(expectedLoadCount, loadCount);
     }
-    boolean hasLoads = countEvents.get(lastIndex).getValue().longValue() > 0;
+    boolean hasLoads = loadCount > 0;
 
-    List<ServiceMetricEvent> timeEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME);
-    Assertions.assertEquals(expectedEventCount, timeEvents.size());
+    long time = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME, expectedEventCount);
     if (hasLoads) {
-      Assertions.assertTrue(timeEvents.get(lastIndex).getValue().longValue() > 
0);
+      Assertions.assertTrue(time > 0);
     } else {
-      Assertions.assertEquals(0, 
timeEvents.get(lastIndex).getValue().longValue());
+      Assertions.assertEquals(0, time);
     }
 
-    List<ServiceMetricEvent> timeMaxEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX);
-    Assertions.assertEquals(expectedEventCount, timeMaxEvents.size());
+    long maxTime = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX, expectedEventCount);
     if (hasLoads) {
-      
Assertions.assertTrue(timeMaxEvents.get(lastIndex).getValue().longValue() > 0);
+      Assertions.assertTrue(maxTime > 0);
     } else {
-      Assertions.assertEquals(0, 
timeMaxEvents.get(lastIndex).getValue().longValue());
+      Assertions.assertEquals(0, maxTime);
     }
 
-    List<ServiceMetricEvent> timeAvgEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG);
-    Assertions.assertEquals(expectedEventCount, timeAvgEvents.size());
+    long avgTime = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG, expectedEventCount);
     if (hasLoads) {
-      
Assertions.assertTrue(timeAvgEvents.get(lastIndex).getValue().longValue() > 0);
+      Assertions.assertTrue(avgTime > 0);
     } else {
-      Assertions.assertEquals(0, 
timeAvgEvents.get(lastIndex).getValue().longValue());
+      Assertions.assertEquals(0, avgTime);
     }
 
-    List<ServiceMetricEvent> waitMaxEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX);
-    Assertions.assertEquals(expectedEventCount, waitMaxEvents.size());
+    long maxWait = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX, expectedEventCount);
     if (hasLoads) {
-      
Assertions.assertTrue(waitMaxEvents.get(lastIndex).getValue().longValue() >= 0);
+      Assertions.assertTrue(maxWait >= 0);
     } else {
-      Assertions.assertEquals(0, 
waitMaxEvents.get(lastIndex).getValue().longValue());
+      Assertions.assertEquals(0, maxWait);
     }
 
-    List<ServiceMetricEvent> waitAvgEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG);
-    Assertions.assertEquals(expectedEventCount, waitAvgEvents.size());
+    long avgWait = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG, expectedEventCount);
     if (hasLoads) {
-      
Assertions.assertTrue(waitAvgEvents.get(lastIndex).getValue().longValue() >= 0);
+      Assertions.assertTrue(avgWait >= 0);
     } else {
-      Assertions.assertEquals(0, 
waitAvgEvents.get(lastIndex).getValue().longValue());
+      Assertions.assertEquals(0, avgWait);
     }
 
-    List<ServiceMetricEvent> loadSizeEvents = 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES);
-    Assertions.assertEquals(expectedEventCount, loadSizeEvents.size());
+    long bytes = getMetricLatestValue(emitter, 
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES, expectedEventCount);
     if (hasLoads) {
-      
Assertions.assertTrue(loadSizeEvents.get(lastIndex).getValue().longValue() > 0);
+      Assertions.assertTrue(bytes > 0);
     } else {
-      Assertions.assertEquals(0, 
loadSizeEvents.get(lastIndex).getValue().longValue());
+      Assertions.assertEquals(0, bytes);
     }
   }
 
+  private long getMetricLatestValue(LatchableEmitter emitter, String 
metricName, int expectedCount)
+  {
+    Assertions.assertEquals(expectedCount, 
emitter.getMetricEventCount(metricName));
+    return emitter.getLatestMetricEventValue(metricName, 0).longValue();
+  }
+
   private String createTestDatasourceName()
   {
     return "wiki-" + IdUtils.getRandomId();
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index fac548afb46..8d114644131 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -24,9 +24,11 @@ import org.apache.druid.java.util.emitter.service.AlertEvent;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -39,9 +41,9 @@ import java.util.concurrent.ConcurrentLinkedDeque;
  */
 public class StubServiceEmitter extends ServiceEmitter implements 
MetricsVerifier
 {
-  private final Queue<Event> events = new ConcurrentLinkedDeque<>();
-  private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
-  private final ConcurrentHashMap<String, Queue<ServiceMetricEvent>> 
metricEvents = new ConcurrentHashMap<>();
+  private final Deque<Event> events = new ConcurrentLinkedDeque<>();
+  private final Deque<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
+  private final ConcurrentHashMap<String, Deque<ServiceMetricEvent>> 
metricEvents = new ConcurrentHashMap<>();
 
   public StubServiceEmitter()
   {
@@ -123,6 +125,38 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
     return values;
   }
 
+  public int getMetricEventCount(String metricName)
+  {
+    final Queue<ServiceMetricEvent> metricEventQueue = 
metricEvents.get(metricName);
+    return metricEventQueue == null ? 0 : metricEventQueue.size();
+  }
+
+  public long getMetricEventLongSum(String metricName)
+  {
+    final Queue<ServiceMetricEvent> metricEventQueue = 
metricEvents.get(metricName);
+    long total = 0;
+    for (ServiceMetricEvent event : metricEventQueue) {
+      total += event.getValue().longValue();
+    }
+    return total;
+  }
+
+  @Nullable
+  public Number getLatestMetricEventValue(String metricName)
+  {
+    final Deque<ServiceMetricEvent> metricEventQueue = 
metricEvents.get(metricName);
+    return metricEventQueue == null ? null : 
metricEventQueue.getLast().getValue();
+  }
+
+  public Number getLatestMetricEventValue(String metricName, Number 
defaultValue)
+  {
+    final Number latest = getLatestMetricEventValue(metricName);
+    if (latest == null) {
+      return defaultValue;
+    }
+    return latest;
+  }
+
   @Override
   public void start()
   {
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index f75b7ec6d8b..f7f10431ace 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -138,4 +138,10 @@ public interface SegmentCacheManager
   void shutdownBootstrap();
 
   void shutdown();
+
+  /**
+   * Collect {@link StorageStats}, if available.
+   */
+  @Nullable
+  StorageStats getStorageStats();
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index bc7f2d8d247..f56a4aa1264 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -53,8 +53,10 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -619,6 +621,31 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     }
   }
 
+  @Nullable
+  @Override
+  public StorageStats getStorageStats()
+  {
+    if (config.isVirtualStorage()) {
+      final Map<String, VirtualStorageLocationStats> locationStats = new 
HashMap<>();
+      for (StorageLocation location : locations) {
+        locationStats.put(location.getPath().toString(), 
location.resetWeakStats());
+      }
+      return new StorageStats(
+          Map.of(),
+          locationStats
+      );
+    } else {
+      final Map<String, StorageLocationStats> locationStats = new HashMap<>();
+      for (StorageLocation location : locations) {
+        locationStats.put(location.getPath().toString(), 
location.resetStaticStats());
+      }
+      return new StorageStats(
+          locationStats,
+          Map.of()
+      );
+    }
+  }
+
   @VisibleForTesting
   public ConcurrentHashMap<DataSegment, ReferenceCountingLock> 
getSegmentLocks()
   {
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java 
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 5d51de581a4..ba0341e9d5b 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -114,10 +114,11 @@ public class StorageLocation
    * Current total size of files in bytes, including weak entries.
    */
   private final AtomicLong currSizeBytes = new AtomicLong(0);
-
+  private final AtomicLong currStaticSizeBytes = new AtomicLong(0);
   private final AtomicLong currWeakSizeBytes = new AtomicLong(0);
 
-  private final AtomicReference<Stats> stats = new AtomicReference<>();
+  private final AtomicReference<StaticStats> staticStats = new 
AtomicReference<>();
+  private final AtomicReference<WeakStats> weakStats = new AtomicReference<>();
 
   /**
    * A {@link ReentrantReadWriteLock.ReadLock} may be used for any operations 
to access {@link #staticCacheEntries} or
@@ -147,7 +148,8 @@ public class StorageLocation
     } else {
       this.freeSpaceToKeep = 0;
     }
-    resetStats();
+    resetStaticStats();
+    resetWeakStats();
   }
 
   /**
@@ -259,6 +261,8 @@ public class StorageLocation
       if (reclaimResult.isSuccess()) {
         staticCacheEntries.put(entry.getId(), entry);
         currSizeBytes.getAndAdd(entry.getSize());
+        currStaticSizeBytes.getAndAdd(entry.getSize());
+        staticStats.getAndUpdate(s -> s.load(entry.getSize()));
       }
       return reclaimResult.isSuccess();
     }
@@ -306,7 +310,7 @@ public class StorageLocation
         final WeakCacheEntry newEntry = new WeakCacheEntry(entry);
         linkNewWeakEntry(newEntry);
         weakCacheEntries.put(entry.getId(), newEntry);
-        stats.get().load();
+        weakStats.getAndUpdate(s -> s.load(entry.getSize()));
       }
       return reclaimResult.isSuccess();
     }
@@ -335,7 +339,7 @@ public class StorageLocation
       WeakCacheEntry existingEntry = weakCacheEntries.get(entryId);
       if (existingEntry != null && existingEntry.hold()) {
         existingEntry.visited = true;
-        stats.get().hit();
+        weakStats.getAndUpdate(WeakStats::hit);
         return new ReservationHold<>((T) existingEntry.cacheEntry, 
existingEntry::release);
       }
       return null;
@@ -369,7 +373,7 @@ public class StorageLocation
       WeakCacheEntry retryExistingEntry = weakCacheEntries.get(entryId);
       if (retryExistingEntry != null && retryExistingEntry.hold()) {
         retryExistingEntry.visited = true;
-        stats.get().hit();
+        weakStats.getAndUpdate(WeakStats::hit);
         return new ReservationHold<>((T) retryExistingEntry.cacheEntry, 
retryExistingEntry::release);
       }
       final CacheEntry newEntry = entrySupplier.get();
@@ -381,7 +385,7 @@ public class StorageLocation
         newWeakEntry.hold();
         linkNewWeakEntry(newWeakEntry);
         weakCacheEntries.put(newEntry.getId(), newWeakEntry);
-        stats.get().load();
+        weakStats.getAndUpdate(s -> s.load(newEntry.getSize()));
         hold = new ReservationHold<>(
             (T) newEntry,
             () -> {
@@ -408,7 +412,7 @@ public class StorageLocation
             }
         );
       } else {
-        stats.get().reject();
+        weakStats.getAndUpdate(WeakStats::reject);
         hold = null;
       }
       return hold;
@@ -431,6 +435,8 @@ public class StorageLocation
         final CacheEntry toRemove = staticCacheEntries.remove(entry.getId());
         toRemove.unmount();
         currSizeBytes.getAndAdd(-entry.getSize());
+        currStaticSizeBytes.getAndAdd(-entry.getSize());
+        staticStats.getAndUpdate(s -> s.drop(entry.getSize()));
       }
     }
     finally {
@@ -578,7 +584,7 @@ public class StorageLocation
             cacheEntryIdentifier -> {
               if (!staticCacheEntries.containsKey(cacheEntryIdentifier)) {
                 removed.unmount();
-                stats.get().unmount();
+                weakStats.getAndUpdate(WeakStats::unmount);
               }
               return null;
             }
@@ -633,17 +639,24 @@ public class StorageLocation
     }
     currSizeBytes.set(0);
     currWeakSizeBytes.set(0);
-    resetStats();
+    currStaticSizeBytes.set(0);
+    resetStaticStats();
+    resetWeakStats();
+  }
+
+  public WeakStats getWeakStats()
+  {
+    return weakStats.get();
   }
 
-  public Stats getStats()
+  public StaticStats resetStaticStats()
   {
-    return stats.get();
+    return staticStats.getAndSet(new StaticStats(currStaticSizeBytes));
   }
 
-  public void resetStats()
+  public WeakStats resetWeakStats()
   {
-    stats.set(new Stats());
+    return weakStats.getAndSet(new WeakStats(currWeakSizeBytes));
   }
 
   /**
@@ -701,11 +714,11 @@ public class StorageLocation
           );
         }
         unlinkWeakEntry(removed);
-        stats.get().evict();
-        toRemove.next = null;
-        toRemove.prev = null;
-        droppedEntries.add(toRemove);
-        sizeFreed += toRemove.cacheEntry.getSize();
+        weakStats.getAndUpdate(s -> s.evict(removed.cacheEntry.getSize()));
+        removed.next = null;
+        removed.prev = null;
+        droppedEntries.add(removed);
+        sizeFreed += removed.cacheEntry.getSize();
         startEntry = null;
       }
 
@@ -897,62 +910,158 @@ public class StorageLocation
     }
   }
 
-  public static final class Stats
+  public static final class StaticStats implements StorageLocationStats
+  {
+    private final AtomicLong sizeUsed;
+    private final AtomicLong loadCount = new AtomicLong(0);
+    private final AtomicLong loadBytes = new AtomicLong(0);
+    private final AtomicLong dropCount = new AtomicLong(0);
+    private final AtomicLong dropBytes = new AtomicLong(0);
+
+    public StaticStats(AtomicLong sizeUsed)
+    {
+      this.sizeUsed = sizeUsed;
+    }
+
+    public StaticStats load(long size)
+    {
+      loadCount.getAndIncrement();
+      loadBytes.getAndAdd(size);
+      return this;
+    }
+
+    public StaticStats drop(long size)
+    {
+      dropCount.getAndIncrement();
+      dropBytes.getAndAdd(size);
+      return this;
+    }
+
+    @Override
+    public long getUsedBytes()
+    {
+      return sizeUsed.get();
+    }
+
+    @Override
+    public long getLoadCount()
+    {
+      return loadCount.get();
+    }
+
+    @Override
+    public long getLoadBytes()
+    {
+      return loadBytes.get();
+    }
+
+    @Override
+    public long getDropCount()
+    {
+      return dropCount.get();
+    }
+
+    @Override
+    public long getDropBytes()
+    {
+      return dropBytes.get();
+    }
+  }
+
+  public static final class WeakStats implements VirtualStorageLocationStats
   {
+    private final AtomicLong sizeUsed;
     private final AtomicLong loadCount = new AtomicLong(0);
+    private final AtomicLong loadBytes = new AtomicLong(0);
     private final AtomicLong rejectionCount = new AtomicLong(0);
     private final AtomicLong hitCount = new AtomicLong(0);
     private final AtomicLong evictionCount = new AtomicLong(0);
+    private final AtomicLong evictionBytes = new AtomicLong(0);
     private final AtomicLong unmountCount = new AtomicLong(0);
 
-    public void hit()
+    public WeakStats(AtomicLong sizeUsed)
     {
-      hitCount.getAndIncrement();
+      this.sizeUsed = sizeUsed;
     }
 
-    public long getHitCount()
+    public WeakStats hit()
     {
-      return hitCount.get();
+      hitCount.getAndIncrement();
+      return this;
     }
 
-    public void load()
+    public WeakStats load(long size)
     {
       loadCount.getAndIncrement();
+      loadBytes.getAndAdd(size);
+      return this;
     }
 
-    public long getLoadCount()
+    public WeakStats evict(long size)
     {
-      return loadCount.get();
+      evictionCount.getAndIncrement();
+      evictionBytes.getAndAdd(size);
+      return this;
     }
 
-    public void evict()
+    public WeakStats unmount()
     {
-      evictionCount.getAndIncrement();
+      unmountCount.getAndIncrement();
+      return this;
     }
 
-    public long getEvictionCount()
+    public WeakStats reject()
     {
-      return evictionCount.get();
+      rejectionCount.getAndIncrement();
+      return this;
     }
 
-    public void unmount()
+    @Override
+    public long getUsedBytes()
     {
-      unmountCount.getAndIncrement();
+      return sizeUsed.get();
     }
 
-    public long getUnmountCount()
+    @Override
+    public long getHitCount()
     {
-      return unmountCount.get();
+      return hitCount.get();
     }
 
-    public void reject()
+    @Override
+    public long getLoadCount()
     {
-      rejectionCount.getAndIncrement();
+      return loadCount.get();
     }
 
+    @Override
+    public long getLoadBytes()
+    {
+      return loadBytes.get();
+    }
+
+    @Override
+    public long getEvictionCount()
+    {
+      return evictionCount.get();
+    }
+
+    @Override
+    public long getEvictionBytes()
+    {
+      return evictionBytes.get();
+    }
+
+    @Override
     public long getRejectCount()
     {
       return rejectionCount.get();
     }
+
+    @VisibleForTesting
+    public long getUnmountCount()
+    {
+      return unmountCount.get();
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationStats.java
 
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationStats.java
new file mode 100644
index 00000000000..d6fd58a5bc1
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationStats.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+public interface StorageLocationStats
+{
+  /**
+   * Current number of bytes stored at the time this measurement collection 
was created.
+   */
+  long getUsedBytes();
+
+  /**
+   * Number of load operations during the measurement period
+   */
+  long getLoadCount();
+
+  /**
+   * Number of bytes loaded during the measurement period
+   */
+  long getLoadBytes();
+
+  /**
+   * Number of drop operations during the measurement period
+   */
+  long getDropCount();
+
+  /**
+   * Number of bytes dropped during the measurement period
+   */
+  long getDropBytes();
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/StorageStats.java 
b/server/src/main/java/org/apache/druid/segment/loading/StorageStats.java
new file mode 100644
index 00000000000..bfd4dc65e2b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageStats.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.guice.annotations.UnstableApi;
+
+import java.util.Map;
+
+/**
+ * Collection of {@link StorageLocationStats} and {@link 
VirtualStorageLocationStats} for all storage locations within
+ * a {@link SegmentCacheManager} so that {@link 
SegmentCacheManager#getStorageStats()} can be used by
+ * {@link org.apache.druid.server.metrics.StorageMonitor} to track segment 
cache activity.
+ * <p>
+ * Note that the stats are not tied explicitly to the {@link StorageLocation} 
implementation used by
+ * {@link SegmentLocalCacheManager}, but it does implement this stuff.
+ */
+@UnstableApi
+public class StorageStats
+{
+  private final Map<String, StorageLocationStats> stats;
+  private final Map<String, VirtualStorageLocationStats> virtualStats;
+
+  public StorageStats(
+      final Map<String, StorageLocationStats> stats,
+      final Map<String, VirtualStorageLocationStats> virtualStats
+  )
+  {
+    this.stats = stats;
+    this.virtualStats = virtualStats;
+  }
+
+  /**
+   * Map of location label (such as file path) to {@link StorageLocationStats}
+   */
+  public Map<String, StorageLocationStats> getLocationStats()
+  {
+    return stats;
+  }
+
+  /**
+   * Map of location label (such as file path) to {@link 
VirtualStorageLocationStats}
+   */
+  public Map<String, VirtualStorageLocationStats> getVirtualLocationStats()
+  {
+    return virtualStats;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java
 
b/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java
new file mode 100644
index 00000000000..4929b60de3b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+public interface VirtualStorageLocationStats
+{
+  /**
+   * Current number of bytes stored which are managed as virtual storage at 
the time this measurement collection was
+   * created.
+   */
+  long getUsedBytes();
+
+  /**
+   * Number of operations for which an entry was already present during the 
measurement period
+   */
+  long getHitCount();
+
+  /**
+   * Number of operations for which an entry was missing and was loaded into 
the cache during the measurement period
+   */
+  long getLoadCount();
+
+  /**
+   * Number of bytes loaded for entries missing from the cache during the 
measurement period
+   */
+  long getLoadBytes();
+
+  /**
+   * Number of cache entries removed during the measurement period
+   */
+  long getEvictionCount();
+
+  /**
+   * Number of bytes removed from the cache during the measurement period
+   */
+  long getEvictionBytes();
+
+  /**
+   * Number of operations which could not be loaded due to insufficient space 
during the measurement period
+   */
+  long getRejectCount();
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java 
b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
new file mode 100644
index 00000000000..eac35dad205
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.loading.StorageLocationStats;
+import org.apache.druid.segment.loading.StorageStats;
+import org.apache.druid.segment.loading.VirtualStorageLocationStats;
+
+import java.util.Map;
+
+/**
+ * Monitor to emit output of {@link SegmentCacheManager#getStorageStats()}
+ */
+@LoadScope(roles = {
+    NodeRole.BROKER_JSON_NAME,
+    NodeRole.HISTORICAL_JSON_NAME,
+    NodeRole.INDEXER_JSON_NAME,
+    NodeRole.PEON_JSON_NAME
+})
+public class StorageMonitor extends AbstractMonitor
+{
+  public static final String LOCATION_DIMENSION = "location";
+  public static final String USED_BYTES = "storage/used/bytes";
+  public static final String LOAD_COUNT = "storage/load/count";
+  public static final String LOAD_BYTES = "storage/load/bytes";
+  public static final String DROP_COUNT = "storage/drop/count";
+  public static final String DROP_BYTES = "storage/drop/bytes";
+  public static final String VSF_USED_BYTES = "storage/virtual/used/bytes";
+  public static final String VSF_HIT_COUNT = "storage/virtual/hit/count";
+  public static final String VSF_LOAD_COUNT = "storage/virtual/load/count";
+  public static final String VSF_LOAD_BYTES = "storage/virtual/load/bytes";
+  public static final String VSF_EVICT_COUNT = "storage/virtual/evict/count";
+  public static final String VSF_EVICT_BYTES = "storage/virtual/evict/bytes";
+  public static final String VSF_REJECT_COUNT = "storage/virtual/reject/count";
+
+  private final SegmentCacheManager cacheManager;
+
+  @Inject
+  public StorageMonitor(
+      SegmentCacheManager cacheManager
+  )
+  {
+    this.cacheManager = cacheManager;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    final StorageStats stats = cacheManager.getStorageStats();
+
+    if (stats != null) {
+      for (Map.Entry<String, StorageLocationStats> location : 
stats.getLocationStats().entrySet()) {
+        final StorageLocationStats staticStats = location.getValue();
+        final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder()
+            .setDimension(LOCATION_DIMENSION, location.getKey());
+        emitter.emit(builder.setMetric(USED_BYTES, 
staticStats.getUsedBytes()));
+        emitter.emit(builder.setMetric(LOAD_COUNT, 
staticStats.getLoadCount()));
+        emitter.emit(builder.setMetric(LOAD_BYTES, 
staticStats.getLoadBytes()));
+        emitter.emit(builder.setMetric(DROP_COUNT, 
staticStats.getDropCount()));
+        emitter.emit(builder.setMetric(DROP_BYTES, 
staticStats.getDropBytes()));
+      }
+
+      for (Map.Entry<String, VirtualStorageLocationStats> location : 
stats.getVirtualLocationStats().entrySet()) {
+        final VirtualStorageLocationStats weakStats = location.getValue();
+        final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder().setDimension(
+            LOCATION_DIMENSION,
+            location.getKey()
+        );
+        emitter.emit(builder.setMetric(VSF_USED_BYTES, 
weakStats.getUsedBytes()));
+        emitter.emit(builder.setMetric(VSF_HIT_COUNT, 
weakStats.getHitCount()));
+        emitter.emit(builder.setMetric(VSF_LOAD_COUNT, 
weakStats.getLoadCount()));
+        emitter.emit(builder.setMetric(VSF_LOAD_BYTES, 
weakStats.getLoadBytes()));
+        emitter.emit(builder.setMetric(VSF_EVICT_COUNT, 
weakStats.getEvictionCount()));
+        emitter.emit(builder.setMetric(VSF_EVICT_BYTES, 
weakStats.getEvictionBytes()));
+        emitter.emit(builder.setMetric(VSF_REJECT_COUNT, 
weakStats.getRejectCount()));
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
 
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
index 9ee49251f11..6121dcaf656 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
@@ -23,6 +23,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.util.List;
 import java.util.Optional;
@@ -116,4 +117,11 @@ public class NoopSegmentCacheManager implements 
SegmentCacheManager
   {
     throw new UnsupportedOperationException();
   }
+
+  @Nullable
+  @Override
+  public StorageStats getStorageStats()
+  {
+    return null;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 4af946982f3..aa759c7e120 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -205,7 +205,7 @@ class SegmentLocalCacheManagerConcurrencyTest
       virtualStorageManager.drop(segment);
     }
     for (StorageLocation location : virtualStorageManager.getLocations()) {
-      location.resetStats();
+      location.resetWeakStats();
     }
   }
 
@@ -430,8 +430,8 @@ class SegmentLocalCacheManagerConcurrencyTest
     // 5 because __drop path
     Assertions.assertTrue(5 >= location.getPath().listFiles().length);
     Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
-    Assertions.assertEquals(location.getStats().getEvictionCount(), 
location.getStats().getUnmountCount());
-    Assertions.assertEquals(location2.getStats().getEvictionCount(), 
location2.getStats().getUnmountCount());
+    Assertions.assertEquals(location.getWeakStats().getEvictionCount(), 
location.getWeakStats().getUnmountCount());
+    Assertions.assertEquals(location2.getWeakStats().getEvictionCount(), 
location2.getWeakStats().getUnmountCount());
   }
 
   @Test
@@ -488,8 +488,8 @@ class SegmentLocalCacheManagerConcurrencyTest
       }
     }
 
-    Assertions.assertTrue(location.getStats().getHitCount() >= 0);
-    Assertions.assertTrue(location2.getStats().getHitCount() >= 0);
+    Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+    Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
     assertNoLooseEnds();
   }
 
@@ -564,8 +564,8 @@ class SegmentLocalCacheManagerConcurrencyTest
     Assertions.assertTrue(totalSuccess > ((iterations / 10) * minLoadCount));
     // expect at least some empties from the segment not being cached
     Assertions.assertTrue(totalEmpty > 0);
-    Assertions.assertTrue(location.getStats().getHitCount() >= 0);
-    Assertions.assertTrue(location2.getStats().getHitCount() >= 0);
+    Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+    Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
 
     assertNoLooseEnds();
   }
@@ -599,17 +599,17 @@ class SegmentLocalCacheManagerConcurrencyTest
         totalFailures += result.exceptions.size();
         Assertions.assertEquals(
             totalSuccess,
-            location.getStats().getLoadCount() +
-            location.getStats().getHitCount() +
-            location2.getStats().getLoadCount() +
-            location2.getStats().getHitCount(),
+            location.getWeakStats().getLoadCount() +
+            location.getWeakStats().getHitCount() +
+            location2.getWeakStats().getLoadCount() +
+            location2.getWeakStats().getHitCount(),
             StringUtils.format(
                 "iteration[%s] - loc1: loads[%s] hits[%s] loc2: loads[%s] 
hits[%s]",
                 i,
-                location.getStats().getLoadCount(),
-                location.getStats().getHitCount(),
-                location2.getStats().getLoadCount(),
-                location2.getStats().getHitCount()
+                location.getWeakStats().getLoadCount(),
+                location.getWeakStats().getHitCount(),
+                location2.getWeakStats().getLoadCount(),
+                location2.getWeakStats().getHitCount()
             )
         );
 
@@ -626,20 +626,20 @@ class SegmentLocalCacheManagerConcurrencyTest
     Assertions.assertEquals(iterations, totalSuccess + totalFailures);
     Assertions.assertEquals(
         totalSuccess,
-        location.getStats().getLoadCount()
-        + location.getStats().getHitCount()
-        + location2.getStats().getLoadCount()
-        + location2.getStats().getHitCount()
+        location.getWeakStats().getLoadCount()
+        + location.getWeakStats().getHitCount()
+        + location2.getWeakStats().getLoadCount()
+        + location2.getWeakStats().getHitCount()
     );
-    Assertions.assertTrue(totalFailures <= 
location.getStats().getRejectCount() + location2.getStats()
-                                                                               
            .getRejectCount());
+    Assertions.assertTrue(totalFailures <= 
location.getWeakStats().getRejectCount() + location2.getWeakStats()
+                                                                               
                .getRejectCount());
 
     if (expectHits) {
-      Assertions.assertTrue(location.getStats().getHitCount() >= 0);
-      Assertions.assertTrue(location2.getStats().getHitCount() >= 0);
+      Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+      Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
     } else {
-      Assertions.assertEquals(0, location.getStats().getHitCount());
-      Assertions.assertEquals(0, location2.getStats().getHitCount());
+      Assertions.assertEquals(0, location.getWeakStats().getHitCount());
+      Assertions.assertEquals(0, location2.getWeakStats().getHitCount());
     }
 
     assertNoLooseEnds();
@@ -719,14 +719,14 @@ class SegmentLocalCacheManagerConcurrencyTest
     // 5 because __drop path
     Assertions.assertTrue(5 >= location.getPath().listFiles().length);
     Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
-    Assertions.assertTrue(location.getStats().getLoadCount() >= 4);
-    Assertions.assertTrue(location2.getStats().getLoadCount() >= 4);
-    Assertions.assertEquals(location.getStats().getEvictionCount(), 
location.getStats().getUnmountCount());
-    Assertions.assertEquals(location2.getStats().getEvictionCount(), 
location2.getStats().getUnmountCount());
-    Assertions.assertEquals(location.getStats().getLoadCount() - 4, 
location.getStats().getEvictionCount());
-    Assertions.assertEquals(location2.getStats().getLoadCount() - 4, 
location2.getStats().getEvictionCount());
-    Assertions.assertEquals(location.getStats().getLoadCount() - 4, 
location.getStats().getUnmountCount());
-    Assertions.assertEquals(location2.getStats().getLoadCount() - 4, 
location2.getStats().getUnmountCount());
+    Assertions.assertTrue(location.getWeakStats().getLoadCount() >= 4);
+    Assertions.assertTrue(location2.getWeakStats().getLoadCount() >= 4);
+    Assertions.assertEquals(location.getWeakStats().getEvictionCount(), 
location.getWeakStats().getUnmountCount());
+    Assertions.assertEquals(location2.getWeakStats().getEvictionCount(), 
location2.getWeakStats().getUnmountCount());
+    Assertions.assertEquals(location.getWeakStats().getLoadCount() - 4, 
location.getWeakStats().getEvictionCount());
+    Assertions.assertEquals(location2.getWeakStats().getLoadCount() - 4, 
location2.getWeakStats().getEvictionCount());
+    Assertions.assertEquals(location.getWeakStats().getLoadCount() - 4, 
location.getWeakStats().getUnmountCount());
+    Assertions.assertEquals(location2.getWeakStats().getLoadCount() - 4, 
location2.getWeakStats().getUnmountCount());
   }
 
   private void makeSegmentsToLoad(
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index 5d8b110e17a..75949084db3 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -112,7 +112,8 @@ public class LatchableEmitter extends StubServiceEmitter
   }
 
   /**
-   * Waits until an event that satisfies the given predicate is emitted.
+   * Waits until an event that satisfies the given predicate is emitted, 
considering all previously observed events
+   * since the last call to {@link #flush()} as potential candidates for match 
as well.
    *
    * @param condition     condition to wait for
    * @param timeoutMillis timeout, or negative to not use a timeout
@@ -137,15 +138,56 @@ public class LatchableEmitter extends StubServiceEmitter
   }
 
   /**
-   * Wait until a metric event that matches the given condition is emitted.
+   * Waits until the next event that satisfies the given predicate is emitted. 
This method should only be called
+   * if the caller is certain that a metric matching the condition is going to 
be emitted, such as from a monitor.
+   *
+   * @param condition     condition to wait for
+   * @param timeoutMillis timeout, or negative to not use a timeout
+   */
+  public void waitForNextEvent(Predicate<Event> condition, long timeoutMillis)
+  {
+    final WaitCondition waitCondition = new WaitCondition(condition);
+    registerWaitConditionNextEvent(waitCondition);
+
+    try {
+      final long awaitTime = timeoutMillis >= 0 ? timeoutMillis : 
Long.MAX_VALUE;
+      if (!waitCondition.countDownLatch.await(awaitTime, 
TimeUnit.MILLISECONDS)) {
+        throw new ISE("Timed out waiting for next event");
+      }
+    }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      waitConditions.remove(waitCondition);
+    }
+  }
+
+  /**
+   * Wait until a metric event that matches the given condition is emitted, 
considering all previously observed events
+   * since the last call to {@link #flush()} as potential candidates for match 
as well.
    * Uses the {@link LatchableEmitterConfig#defaultWaitTimeoutMillis}.
    */
   public ServiceMetricEvent waitForEvent(UnaryOperator<EventMatcher> condition)
   {
     final EventMatcher matcher = condition.apply(new EventMatcher());
     waitForEvent(
-        event -> event instanceof ServiceMetricEvent
-                 && matcher.test((ServiceMetricEvent) event),
+        event -> event instanceof ServiceMetricEvent && 
matcher.test((ServiceMetricEvent) event),
+        defaultWaitTimeoutMillis
+    );
+    return matcher.matchingEvent.get();
+  }
+
+  /**
+   * Wait until the next metric event that matches the given condition is 
emitted. This method should only be called
+   * if the caller is certain that a metric matching the condition is going to 
be emitted, such as from a monitor.
+   * Uses the {@link LatchableEmitterConfig#defaultWaitTimeoutMillis}.
+   */
+  public ServiceMetricEvent waitForNextEvent(UnaryOperator<EventMatcher> 
condition)
+  {
+    final EventMatcher matcher = condition.apply(new EventMatcher());
+    waitForNextEvent(
+        event -> event instanceof ServiceMetricEvent && 
matcher.test((ServiceMetricEvent) event),
         defaultWaitTimeoutMillis
     );
     return matcher.matchingEvent.get();
@@ -227,6 +269,17 @@ public class LatchableEmitter extends StubServiceEmitter
     }
   }
 
+  /**
+   * Evaluates the given new condition for all past events and then adds it to
+   * {@link #waitConditions}.
+   */
+  private void registerWaitConditionNextEvent(WaitCondition condition)
+  {
+    eventProcessingLock.lock();
+    waitConditions.add(condition);
+    eventProcessingLock.unlock();
+  }
+
   private static class WaitCondition
   {
     private final Predicate<Event> predicate;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to