This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 56262132981 add StorageMonitor to measure storage and virtual storage
usage by segment cache (#18742)
56262132981 is described below
commit 562621329812d6517bd1282eebaf1edaf49ec210
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Dec 2 15:24:45 2025 -0800
add StorageMonitor to measure storage and virtual storage usage by segment
cache (#18742)
changes:
* adds `StorageLocationStats` and `VirtualStorageLocationStats` to abstract
measuring segment cache/virtual segment cache usage
* adds `StorageStats` to collect `StorageLocationStats` and
`VirtualStorageLocationStats` into maps between location label and the stats of
that location
* adds new method `SegmentCacheManager.getStorageStats` to expose stats
collection for segment cache manager implementations
* adds new `StorageMonitor` to emit metrics for values in `StorageStats`
* metrics for `StorageLocationStats`:
- storage/used/bytes
- storage/load/count
- storage/load/bytes
- storage/drop/count
- storage/drop/bytes
* metrics for `VirtualStorageLocationStats`:
- storage/virtual/used/bytes
- storage/virtual/hit/count
- storage/virtual/load/count
- storage/virtual/load/bytes
- storage/virtual/evict/count
- storage/virtual/evict/bytes
- storage/virtual/reject/count
---
.../embedded/query/QueryVirtualStorageTest.java | 122 +++++++++-----
.../java/util/metrics/StubServiceEmitter.java | 40 ++++-
.../druid/segment/loading/SegmentCacheManager.java | 6 +
.../segment/loading/SegmentLocalCacheManager.java | 27 +++
.../druid/segment/loading/StorageLocation.java | 183 ++++++++++++++++-----
.../segment/loading/StorageLocationStats.java | 48 ++++++
.../apache/druid/segment/loading/StorageStats.java | 64 +++++++
.../loading/VirtualStorageLocationStats.java | 59 +++++++
.../druid/server/metrics/StorageMonitor.java | 104 ++++++++++++
.../segment/loading/NoopSegmentCacheManager.java | 8 +
.../SegmentLocalCacheManagerConcurrencyTest.java | 66 ++++----
.../druid/server/metrics/LatchableEmitter.java | 61 ++++++-
12 files changed, 669 insertions(+), 119 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index 2c53b80ef4d..a5489041b80 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -29,6 +29,7 @@ import
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -51,7 +52,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -102,6 +102,11 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
.useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
.addCommonProperty("druid.storage.zip", "false")
+ .addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
+ .addCommonProperty(
+ "druid.monitoring.monitors",
+ "[\"org.apache.druid.server.metrics.StorageMonitor\"]"
+ )
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
@@ -152,90 +157,123 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12
14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'",
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12
19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'"
};
- final long[] expectedResults = new long[] {
- 9770,
- 10524,
- 10267,
- 8683
- };
+ final long[] expectedResults = new long[]{9770, 10524, 10267, 8683};
+ final long[] expectedLoads = new long[]{8L, 6L, 5L, 5L};
+
+
+ LatchableEmitter emitter = historical.latchableEmitter();
+ // clear out the pipe to get zerod out storage monitor metrics
+ ServiceMetricEvent monitorEvent = emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+ while (monitorEvent != null && monitorEvent.getValue().longValue() > 0) {
+ monitorEvent = emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+ }
+ // then flush (which clears out the internal events stores in test
emitter) so we can do clean sums across them
+ emitter.flush();
+
+ emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+ long beforeLoads =
emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
+ // confirm flushed
+ Assertions.assertEquals(0, beforeLoads);
+ // run the queries in order
Assertions.assertEquals(expectedResults[0],
Long.parseLong(cluster.runSql(queries[0], dataSource)));
- assertMetrics(1, 8L);
+ assertQueryMetrics(1, expectedLoads[0]);
Assertions.assertEquals(expectedResults[1],
Long.parseLong(cluster.runSql(queries[1], dataSource)));
- assertMetrics(2, 6L);
+ assertQueryMetrics(2, expectedLoads[1]);
Assertions.assertEquals(expectedResults[2],
Long.parseLong(cluster.runSql(queries[2], dataSource)));
- assertMetrics(3, 5L);
+ assertQueryMetrics(3, expectedLoads[2]);
Assertions.assertEquals(expectedResults[3],
Long.parseLong(cluster.runSql(queries[3], dataSource)));
- assertMetrics(4, 5L);
+ assertQueryMetrics(4, expectedLoads[3]);
+ emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+ long firstLoads =
emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
+ Assertions.assertTrue(firstLoads >= 24, "expected " + 24 + " but only got
" + firstLoads);
+
+ long expectedTotalHits = 0;
+ long expectedTotalLoad = 0;
for (int i = 0; i < 1000; i++) {
int nextQuery = ThreadLocalRandom.current().nextInt(queries.length);
Assertions.assertEquals(expectedResults[nextQuery],
Long.parseLong(cluster.runSql(queries[nextQuery], dataSource)));
- assertMetrics(i + 5, null);
+ assertQueryMetrics(i + 5, null);
+ long actualLoads = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, i + 5);
+ expectedTotalLoad += actualLoads;
+ expectedTotalHits += (expectedLoads[nextQuery] - actualLoads);
}
+
+ emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_HIT_COUNT));
+ long hits = emitter.getMetricEventLongSum(StorageMonitor.VSF_HIT_COUNT);
+ Assertions.assertTrue(hits >= expectedTotalHits, "expected " +
expectedTotalHits + " but only got " + hits);
+ emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
+ long loads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
+ Assertions.assertTrue(loads >= expectedTotalLoad, "expected " +
expectedTotalLoad + " but only got " + loads);
+
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_BYTES)
> 0);
+ emitter.waitForNextEvent(event ->
event.hasMetricName(StorageMonitor.VSF_EVICT_COUNT));
+
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_EVICT_COUNT)
>= 0);
+
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_EVICT_BYTES)
> 0);
+ Assertions.assertEquals(0,
emitter.getMetricEventLongSum(StorageMonitor.VSF_REJECT_COUNT));
+
Assertions.assertTrue(emitter.getLatestMetricEventValue(StorageMonitor.VSF_USED_BYTES,
0).longValue() > 0);
}
- private void assertMetrics(int expectedEventCount, @Nullable Long
expectedLoadCount)
+
+ private void assertQueryMetrics(int expectedEventCount, @Nullable Long
expectedLoadCount)
{
LatchableEmitter emitter = historical.latchableEmitter();
- final int lastIndex = expectedEventCount - 1;
- List<ServiceMetricEvent> countEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT);
- Assertions.assertEquals(expectedEventCount, countEvents.size());
+ long loadCount = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, expectedEventCount);
if (expectedLoadCount != null) {
- Assertions.assertEquals(expectedLoadCount,
countEvents.get(lastIndex).getValue());
+ Assertions.assertEquals(expectedLoadCount, loadCount);
}
- boolean hasLoads = countEvents.get(lastIndex).getValue().longValue() > 0;
+ boolean hasLoads = loadCount > 0;
- List<ServiceMetricEvent> timeEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME);
- Assertions.assertEquals(expectedEventCount, timeEvents.size());
+ long time = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME, expectedEventCount);
if (hasLoads) {
- Assertions.assertTrue(timeEvents.get(lastIndex).getValue().longValue() >
0);
+ Assertions.assertTrue(time > 0);
} else {
- Assertions.assertEquals(0,
timeEvents.get(lastIndex).getValue().longValue());
+ Assertions.assertEquals(0, time);
}
- List<ServiceMetricEvent> timeMaxEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX);
- Assertions.assertEquals(expectedEventCount, timeMaxEvents.size());
+ long maxTime = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX, expectedEventCount);
if (hasLoads) {
-
Assertions.assertTrue(timeMaxEvents.get(lastIndex).getValue().longValue() > 0);
+ Assertions.assertTrue(maxTime > 0);
} else {
- Assertions.assertEquals(0,
timeMaxEvents.get(lastIndex).getValue().longValue());
+ Assertions.assertEquals(0, maxTime);
}
- List<ServiceMetricEvent> timeAvgEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG);
- Assertions.assertEquals(expectedEventCount, timeAvgEvents.size());
+ long avgTime = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG, expectedEventCount);
if (hasLoads) {
-
Assertions.assertTrue(timeAvgEvents.get(lastIndex).getValue().longValue() > 0);
+ Assertions.assertTrue(avgTime > 0);
} else {
- Assertions.assertEquals(0,
timeAvgEvents.get(lastIndex).getValue().longValue());
+ Assertions.assertEquals(0, avgTime);
}
- List<ServiceMetricEvent> waitMaxEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX);
- Assertions.assertEquals(expectedEventCount, waitMaxEvents.size());
+ long maxWait = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX, expectedEventCount);
if (hasLoads) {
-
Assertions.assertTrue(waitMaxEvents.get(lastIndex).getValue().longValue() >= 0);
+ Assertions.assertTrue(maxWait >= 0);
} else {
- Assertions.assertEquals(0,
waitMaxEvents.get(lastIndex).getValue().longValue());
+ Assertions.assertEquals(0, maxWait);
}
- List<ServiceMetricEvent> waitAvgEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG);
- Assertions.assertEquals(expectedEventCount, waitAvgEvents.size());
+ long avgWait = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG, expectedEventCount);
if (hasLoads) {
-
Assertions.assertTrue(waitAvgEvents.get(lastIndex).getValue().longValue() >= 0);
+ Assertions.assertTrue(avgWait >= 0);
} else {
- Assertions.assertEquals(0,
waitAvgEvents.get(lastIndex).getValue().longValue());
+ Assertions.assertEquals(0, avgWait);
}
- List<ServiceMetricEvent> loadSizeEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES);
- Assertions.assertEquals(expectedEventCount, loadSizeEvents.size());
+ long bytes = getMetricLatestValue(emitter,
DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES, expectedEventCount);
if (hasLoads) {
-
Assertions.assertTrue(loadSizeEvents.get(lastIndex).getValue().longValue() > 0);
+ Assertions.assertTrue(bytes > 0);
} else {
- Assertions.assertEquals(0,
loadSizeEvents.get(lastIndex).getValue().longValue());
+ Assertions.assertEquals(0, bytes);
}
}
+ private long getMetricLatestValue(LatchableEmitter emitter, String
metricName, int expectedCount)
+ {
+ Assertions.assertEquals(expectedCount,
emitter.getMetricEventCount(metricName));
+ return emitter.getLatestMetricEventValue(metricName, 0).longValue();
+ }
+
private String createTestDatasourceName()
{
return "wiki-" + IdUtils.getRandomId();
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index fac548afb46..8d114644131 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -24,9 +24,11 @@ import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -39,9 +41,9 @@ import java.util.concurrent.ConcurrentLinkedDeque;
*/
public class StubServiceEmitter extends ServiceEmitter implements
MetricsVerifier
{
- private final Queue<Event> events = new ConcurrentLinkedDeque<>();
- private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
- private final ConcurrentHashMap<String, Queue<ServiceMetricEvent>>
metricEvents = new ConcurrentHashMap<>();
+ private final Deque<Event> events = new ConcurrentLinkedDeque<>();
+ private final Deque<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
+ private final ConcurrentHashMap<String, Deque<ServiceMetricEvent>>
metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{
@@ -123,6 +125,38 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
return values;
}
+ public int getMetricEventCount(String metricName)
+ {
+ final Queue<ServiceMetricEvent> metricEventQueue =
metricEvents.get(metricName);
+ return metricEventQueue == null ? 0 : metricEventQueue.size();
+ }
+
+ public long getMetricEventLongSum(String metricName)
+ {
+ final Queue<ServiceMetricEvent> metricEventQueue =
metricEvents.get(metricName);
+ long total = 0;
+ for (ServiceMetricEvent event : metricEventQueue) {
+ total += event.getValue().longValue();
+ }
+ return total;
+ }
+
+ @Nullable
+ public Number getLatestMetricEventValue(String metricName)
+ {
+ final Deque<ServiceMetricEvent> metricEventQueue =
metricEvents.get(metricName);
+ return metricEventQueue == null ? null :
metricEventQueue.getLast().getValue();
+ }
+
+ public Number getLatestMetricEventValue(String metricName, Number
defaultValue)
+ {
+ final Number latest = getLatestMetricEventValue(metricName);
+ if (latest == null) {
+ return defaultValue;
+ }
+ return latest;
+ }
+
@Override
public void start()
{
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index f75b7ec6d8b..f7f10431ace 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -138,4 +138,10 @@ public interface SegmentCacheManager
void shutdownBootstrap();
void shutdown();
+
+ /**
+ * Collect {@link StorageStats}, if available.
+ */
+ @Nullable
+ StorageStats getStorageStats();
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index bc7f2d8d247..f56a4aa1264 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -53,8 +53,10 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -619,6 +621,31 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
}
+ @Nullable
+ @Override
+ public StorageStats getStorageStats()
+ {
+ if (config.isVirtualStorage()) {
+ final Map<String, VirtualStorageLocationStats> locationStats = new
HashMap<>();
+ for (StorageLocation location : locations) {
+ locationStats.put(location.getPath().toString(),
location.resetWeakStats());
+ }
+ return new StorageStats(
+ Map.of(),
+ locationStats
+ );
+ } else {
+ final Map<String, StorageLocationStats> locationStats = new HashMap<>();
+ for (StorageLocation location : locations) {
+ locationStats.put(location.getPath().toString(),
location.resetStaticStats());
+ }
+ return new StorageStats(
+ locationStats,
+ Map.of()
+ );
+ }
+ }
+
@VisibleForTesting
public ConcurrentHashMap<DataSegment, ReferenceCountingLock>
getSegmentLocks()
{
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 5d51de581a4..ba0341e9d5b 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -114,10 +114,11 @@ public class StorageLocation
* Current total size of files in bytes, including weak entries.
*/
private final AtomicLong currSizeBytes = new AtomicLong(0);
-
+ private final AtomicLong currStaticSizeBytes = new AtomicLong(0);
private final AtomicLong currWeakSizeBytes = new AtomicLong(0);
- private final AtomicReference<Stats> stats = new AtomicReference<>();
+ private final AtomicReference<StaticStats> staticStats = new
AtomicReference<>();
+ private final AtomicReference<WeakStats> weakStats = new AtomicReference<>();
/**
* A {@link ReentrantReadWriteLock.ReadLock} may be used for any operations
to access {@link #staticCacheEntries} or
@@ -147,7 +148,8 @@ public class StorageLocation
} else {
this.freeSpaceToKeep = 0;
}
- resetStats();
+ resetStaticStats();
+ resetWeakStats();
}
/**
@@ -259,6 +261,8 @@ public class StorageLocation
if (reclaimResult.isSuccess()) {
staticCacheEntries.put(entry.getId(), entry);
currSizeBytes.getAndAdd(entry.getSize());
+ currStaticSizeBytes.getAndAdd(entry.getSize());
+ staticStats.getAndUpdate(s -> s.load(entry.getSize()));
}
return reclaimResult.isSuccess();
}
@@ -306,7 +310,7 @@ public class StorageLocation
final WeakCacheEntry newEntry = new WeakCacheEntry(entry);
linkNewWeakEntry(newEntry);
weakCacheEntries.put(entry.getId(), newEntry);
- stats.get().load();
+ weakStats.getAndUpdate(s -> s.load(entry.getSize()));
}
return reclaimResult.isSuccess();
}
@@ -335,7 +339,7 @@ public class StorageLocation
WeakCacheEntry existingEntry = weakCacheEntries.get(entryId);
if (existingEntry != null && existingEntry.hold()) {
existingEntry.visited = true;
- stats.get().hit();
+ weakStats.getAndUpdate(WeakStats::hit);
return new ReservationHold<>((T) existingEntry.cacheEntry,
existingEntry::release);
}
return null;
@@ -369,7 +373,7 @@ public class StorageLocation
WeakCacheEntry retryExistingEntry = weakCacheEntries.get(entryId);
if (retryExistingEntry != null && retryExistingEntry.hold()) {
retryExistingEntry.visited = true;
- stats.get().hit();
+ weakStats.getAndUpdate(WeakStats::hit);
return new ReservationHold<>((T) retryExistingEntry.cacheEntry,
retryExistingEntry::release);
}
final CacheEntry newEntry = entrySupplier.get();
@@ -381,7 +385,7 @@ public class StorageLocation
newWeakEntry.hold();
linkNewWeakEntry(newWeakEntry);
weakCacheEntries.put(newEntry.getId(), newWeakEntry);
- stats.get().load();
+ weakStats.getAndUpdate(s -> s.load(newEntry.getSize()));
hold = new ReservationHold<>(
(T) newEntry,
() -> {
@@ -408,7 +412,7 @@ public class StorageLocation
}
);
} else {
- stats.get().reject();
+ weakStats.getAndUpdate(WeakStats::reject);
hold = null;
}
return hold;
@@ -431,6 +435,8 @@ public class StorageLocation
final CacheEntry toRemove = staticCacheEntries.remove(entry.getId());
toRemove.unmount();
currSizeBytes.getAndAdd(-entry.getSize());
+ currStaticSizeBytes.getAndAdd(-entry.getSize());
+ staticStats.getAndUpdate(s -> s.drop(entry.getSize()));
}
}
finally {
@@ -578,7 +584,7 @@ public class StorageLocation
cacheEntryIdentifier -> {
if (!staticCacheEntries.containsKey(cacheEntryIdentifier)) {
removed.unmount();
- stats.get().unmount();
+ weakStats.getAndUpdate(WeakStats::unmount);
}
return null;
}
@@ -633,17 +639,24 @@ public class StorageLocation
}
currSizeBytes.set(0);
currWeakSizeBytes.set(0);
- resetStats();
+ currStaticSizeBytes.set(0);
+ resetStaticStats();
+ resetWeakStats();
+ }
+
+ public WeakStats getWeakStats()
+ {
+ return weakStats.get();
}
- public Stats getStats()
+ public StaticStats resetStaticStats()
{
- return stats.get();
+ return staticStats.getAndSet(new StaticStats(currStaticSizeBytes));
}
- public void resetStats()
+ public WeakStats resetWeakStats()
{
- stats.set(new Stats());
+ return weakStats.getAndSet(new WeakStats(currWeakSizeBytes));
}
/**
@@ -701,11 +714,11 @@ public class StorageLocation
);
}
unlinkWeakEntry(removed);
- stats.get().evict();
- toRemove.next = null;
- toRemove.prev = null;
- droppedEntries.add(toRemove);
- sizeFreed += toRemove.cacheEntry.getSize();
+ weakStats.getAndUpdate(s -> s.evict(removed.cacheEntry.getSize()));
+ removed.next = null;
+ removed.prev = null;
+ droppedEntries.add(removed);
+ sizeFreed += removed.cacheEntry.getSize();
startEntry = null;
}
@@ -897,62 +910,158 @@ public class StorageLocation
}
}
- public static final class Stats
+ public static final class StaticStats implements StorageLocationStats
+ {
+ private final AtomicLong sizeUsed;
+ private final AtomicLong loadCount = new AtomicLong(0);
+ private final AtomicLong loadBytes = new AtomicLong(0);
+ private final AtomicLong dropCount = new AtomicLong(0);
+ private final AtomicLong dropBytes = new AtomicLong(0);
+
+ public StaticStats(AtomicLong sizeUsed)
+ {
+ this.sizeUsed = sizeUsed;
+ }
+
+ public StaticStats load(long size)
+ {
+ loadCount.getAndIncrement();
+ loadBytes.getAndAdd(size);
+ return this;
+ }
+
+ public StaticStats drop(long size)
+ {
+ dropCount.getAndIncrement();
+ dropBytes.getAndAdd(size);
+ return this;
+ }
+
+ @Override
+ public long getUsedBytes()
+ {
+ return sizeUsed.get();
+ }
+
+ @Override
+ public long getLoadCount()
+ {
+ return loadCount.get();
+ }
+
+ @Override
+ public long getLoadBytes()
+ {
+ return loadBytes.get();
+ }
+
+ @Override
+ public long getDropCount()
+ {
+ return dropCount.get();
+ }
+
+ @Override
+ public long getDropBytes()
+ {
+ return dropBytes.get();
+ }
+ }
+
+ public static final class WeakStats implements VirtualStorageLocationStats
{
+ private final AtomicLong sizeUsed;
private final AtomicLong loadCount = new AtomicLong(0);
+ private final AtomicLong loadBytes = new AtomicLong(0);
private final AtomicLong rejectionCount = new AtomicLong(0);
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong evictionCount = new AtomicLong(0);
+ private final AtomicLong evictionBytes = new AtomicLong(0);
private final AtomicLong unmountCount = new AtomicLong(0);
- public void hit()
+ public WeakStats(AtomicLong sizeUsed)
{
- hitCount.getAndIncrement();
+ this.sizeUsed = sizeUsed;
}
- public long getHitCount()
+ public WeakStats hit()
{
- return hitCount.get();
+ hitCount.getAndIncrement();
+ return this;
}
- public void load()
+ public WeakStats load(long size)
{
loadCount.getAndIncrement();
+ loadBytes.getAndAdd(size);
+ return this;
}
- public long getLoadCount()
+ public WeakStats evict(long size)
{
- return loadCount.get();
+ evictionCount.getAndIncrement();
+ evictionBytes.getAndAdd(size);
+ return this;
}
- public void evict()
+ public WeakStats unmount()
{
- evictionCount.getAndIncrement();
+ unmountCount.getAndIncrement();
+ return this;
}
- public long getEvictionCount()
+ public WeakStats reject()
{
- return evictionCount.get();
+ rejectionCount.getAndIncrement();
+ return this;
}
- public void unmount()
+ @Override
+ public long getUsedBytes()
{
- unmountCount.getAndIncrement();
+ return sizeUsed.get();
}
- public long getUnmountCount()
+ @Override
+ public long getHitCount()
{
- return unmountCount.get();
+ return hitCount.get();
}
- public void reject()
+ @Override
+ public long getLoadCount()
{
- rejectionCount.getAndIncrement();
+ return loadCount.get();
}
+ @Override
+ public long getLoadBytes()
+ {
+ return loadBytes.get();
+ }
+
+ @Override
+ public long getEvictionCount()
+ {
+ return evictionCount.get();
+ }
+
+ @Override
+ public long getEvictionBytes()
+ {
+ return evictionBytes.get();
+ }
+
+ @Override
public long getRejectCount()
{
return rejectionCount.get();
}
+
+ @VisibleForTesting
+ public long getUnmountCount()
+ {
+ return unmountCount.get();
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationStats.java
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationStats.java
new file mode 100644
index 00000000000..d6fd58a5bc1
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationStats.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+public interface StorageLocationStats
+{
+ /**
+ * Current number of bytes stored at the time this measurement collection
was created.
+ */
+ long getUsedBytes();
+
+ /**
+ * Number of load operations during the measurement period
+ */
+ long getLoadCount();
+
+ /**
+ * Number of bytes loaded during the measurement period
+ */
+ long getLoadBytes();
+
+ /**
+ * Number of drop operations during the measurement period
+ */
+ long getDropCount();
+
+ /**
+ * Number of bytes dropped during the measurement period
+ */
+ long getDropBytes();
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/StorageStats.java
b/server/src/main/java/org/apache/druid/segment/loading/StorageStats.java
new file mode 100644
index 00000000000..bfd4dc65e2b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageStats.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.guice.annotations.UnstableApi;
+
+import java.util.Map;
+
+/**
+ * Collection of {@link StorageLocationStats} and {@link
VirtualStorageLocationStats} for all storage locations within
+ * a {@link SegmentCacheManager} so that {@link
SegmentCacheManager#getStorageStats()} can be used by
+ * {@link org.apache.druid.server.metrics.StorageMonitor} to track segment
cache activity.
+ * <p>
+ * Note that the stats are not tied explicitly to the {@link StorageLocation}
implementation used by
+ * {@link SegmentLocalCacheManager}, but it does implement this stuff.
+ */
+@UnstableApi
+public class StorageStats
+{
+ private final Map<String, StorageLocationStats> stats;
+ private final Map<String, VirtualStorageLocationStats> virtualStats;
+
+ public StorageStats(
+ final Map<String, StorageLocationStats> stats,
+ final Map<String, VirtualStorageLocationStats> virtualStats
+ )
+ {
+ this.stats = stats;
+ this.virtualStats = virtualStats;
+ }
+
+ /**
+ * Map of location label (such as file path) to {@link StorageLocationStats}
+ */
+ public Map<String, StorageLocationStats> getLocationStats()
+ {
+ return stats;
+ }
+
+ /**
+ * Map of location label (such as file path) to {@link
VirtualStorageLocationStats}
+ */
+ public Map<String, VirtualStorageLocationStats> getVirtualLocationStats()
+ {
+ return virtualStats;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java
b/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java
new file mode 100644
index 00000000000..4929b60de3b
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+public interface VirtualStorageLocationStats
+{
+ /**
+ * Current number of bytes stored which are managed as virtual storage at
the time this measurement collection was
+ * created.
+ */
+ long getUsedBytes();
+
+ /**
+ * Number of operations for which an entry was already present during the
measurement period
+ */
+ long getHitCount();
+
+ /**
+ * Number of operations for which an entry was missing and was loaded into
the cache during the measurement period
+ */
+ long getLoadCount();
+
+ /**
+ * Number of bytes loaded for entries missing from the cache during the
measurement period
+ */
+ long getLoadBytes();
+
+ /**
+ * Number of cache entries removed during the measurement period
+ */
+ long getEvictionCount();
+
+ /**
+ * Number of bytes removed from the cache during the measurement period
+ */
+ long getEvictionBytes();
+
+ /**
+ * Number of operations which could not be loaded due to insufficient space
during the measurement period
+ */
+ long getRejectCount();
+}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
new file mode 100644
index 00000000000..eac35dad205
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.loading.StorageLocationStats;
+import org.apache.druid.segment.loading.StorageStats;
+import org.apache.druid.segment.loading.VirtualStorageLocationStats;
+
+import java.util.Map;
+
+/**
+ * Monitor to emit output of {@link SegmentCacheManager#getStorageStats()}
+ */
+@LoadScope(roles = {
+ NodeRole.BROKER_JSON_NAME,
+ NodeRole.HISTORICAL_JSON_NAME,
+ NodeRole.INDEXER_JSON_NAME,
+ NodeRole.PEON_JSON_NAME
+})
+public class StorageMonitor extends AbstractMonitor
+{
+ public static final String LOCATION_DIMENSION = "location";
+ public static final String USED_BYTES = "storage/used/bytes";
+ public static final String LOAD_COUNT = "storage/load/count";
+ public static final String LOAD_BYTES = "storage/load/bytes";
+ public static final String DROP_COUNT = "storage/drop/count";
+ public static final String DROP_BYTES = "storage/drop/bytes";
+ public static final String VSF_USED_BYTES = "storage/virtual/used/bytes";
+ public static final String VSF_HIT_COUNT = "storage/virtual/hit/count";
+ public static final String VSF_LOAD_COUNT = "storage/virtual/load/count";
+ public static final String VSF_LOAD_BYTES = "storage/virtual/load/bytes";
+ public static final String VSF_EVICT_COUNT = "storage/virtual/evict/count";
+ public static final String VSF_EVICT_BYTES = "storage/virtual/evict/bytes";
+ public static final String VSF_REJECT_COUNT = "storage/virtual/reject/count";
+
+ private final SegmentCacheManager cacheManager;
+
+ @Inject
+ public StorageMonitor(
+ SegmentCacheManager cacheManager
+ )
+ {
+ this.cacheManager = cacheManager;
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ final StorageStats stats = cacheManager.getStorageStats();
+
+ if (stats != null) {
+ for (Map.Entry<String, StorageLocationStats> location :
stats.getLocationStats().entrySet()) {
+ final StorageLocationStats staticStats = location.getValue();
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder()
+ .setDimension(LOCATION_DIMENSION, location.getKey());
+ emitter.emit(builder.setMetric(USED_BYTES,
staticStats.getUsedBytes()));
+ emitter.emit(builder.setMetric(LOAD_COUNT,
staticStats.getLoadCount()));
+ emitter.emit(builder.setMetric(LOAD_BYTES,
staticStats.getLoadBytes()));
+ emitter.emit(builder.setMetric(DROP_COUNT,
staticStats.getDropCount()));
+ emitter.emit(builder.setMetric(DROP_BYTES,
staticStats.getDropBytes()));
+ }
+
+ for (Map.Entry<String, VirtualStorageLocationStats> location :
stats.getVirtualLocationStats().entrySet()) {
+ final VirtualStorageLocationStats weakStats = location.getValue();
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder().setDimension(
+ LOCATION_DIMENSION,
+ location.getKey()
+ );
+ emitter.emit(builder.setMetric(VSF_USED_BYTES,
weakStats.getUsedBytes()));
+ emitter.emit(builder.setMetric(VSF_HIT_COUNT,
weakStats.getHitCount()));
+ emitter.emit(builder.setMetric(VSF_LOAD_COUNT,
weakStats.getLoadCount()));
+ emitter.emit(builder.setMetric(VSF_LOAD_BYTES,
weakStats.getLoadBytes()));
+ emitter.emit(builder.setMetric(VSF_EVICT_COUNT,
weakStats.getEvictionCount()));
+ emitter.emit(builder.setMetric(VSF_EVICT_BYTES,
weakStats.getEvictionBytes()));
+ emitter.emit(builder.setMetric(VSF_REJECT_COUNT,
weakStats.getRejectCount()));
+ }
+ }
+ return true;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
index 9ee49251f11..6121dcaf656 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
@@ -23,6 +23,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Optional;
@@ -116,4 +117,11 @@ public class NoopSegmentCacheManager implements
SegmentCacheManager
{
throw new UnsupportedOperationException();
}
+
+ @Nullable
+ @Override
+ public StorageStats getStorageStats()
+ {
+ return null;
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 4af946982f3..aa759c7e120 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -205,7 +205,7 @@ class SegmentLocalCacheManagerConcurrencyTest
virtualStorageManager.drop(segment);
}
for (StorageLocation location : virtualStorageManager.getLocations()) {
- location.resetStats();
+ location.resetWeakStats();
}
}
@@ -430,8 +430,8 @@ class SegmentLocalCacheManagerConcurrencyTest
// 5 because __drop path
Assertions.assertTrue(5 >= location.getPath().listFiles().length);
Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
- Assertions.assertEquals(location.getStats().getEvictionCount(),
location.getStats().getUnmountCount());
- Assertions.assertEquals(location2.getStats().getEvictionCount(),
location2.getStats().getUnmountCount());
+ Assertions.assertEquals(location.getWeakStats().getEvictionCount(),
location.getWeakStats().getUnmountCount());
+ Assertions.assertEquals(location2.getWeakStats().getEvictionCount(),
location2.getWeakStats().getUnmountCount());
}
@Test
@@ -488,8 +488,8 @@ class SegmentLocalCacheManagerConcurrencyTest
}
}
- Assertions.assertTrue(location.getStats().getHitCount() >= 0);
- Assertions.assertTrue(location2.getStats().getHitCount() >= 0);
+ Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+ Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
assertNoLooseEnds();
}
@@ -564,8 +564,8 @@ class SegmentLocalCacheManagerConcurrencyTest
Assertions.assertTrue(totalSuccess > ((iterations / 10) * minLoadCount));
// expect at least some empties from the segment not being cached
Assertions.assertTrue(totalEmpty > 0);
- Assertions.assertTrue(location.getStats().getHitCount() >= 0);
- Assertions.assertTrue(location2.getStats().getHitCount() >= 0);
+ Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+ Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
assertNoLooseEnds();
}
@@ -599,17 +599,17 @@ class SegmentLocalCacheManagerConcurrencyTest
totalFailures += result.exceptions.size();
Assertions.assertEquals(
totalSuccess,
- location.getStats().getLoadCount() +
- location.getStats().getHitCount() +
- location2.getStats().getLoadCount() +
- location2.getStats().getHitCount(),
+ location.getWeakStats().getLoadCount() +
+ location.getWeakStats().getHitCount() +
+ location2.getWeakStats().getLoadCount() +
+ location2.getWeakStats().getHitCount(),
StringUtils.format(
"iteration[%s] - loc1: loads[%s] hits[%s] loc2: loads[%s]
hits[%s]",
i,
- location.getStats().getLoadCount(),
- location.getStats().getHitCount(),
- location2.getStats().getLoadCount(),
- location2.getStats().getHitCount()
+ location.getWeakStats().getLoadCount(),
+ location.getWeakStats().getHitCount(),
+ location2.getWeakStats().getLoadCount(),
+ location2.getWeakStats().getHitCount()
)
);
@@ -626,20 +626,20 @@ class SegmentLocalCacheManagerConcurrencyTest
Assertions.assertEquals(iterations, totalSuccess + totalFailures);
Assertions.assertEquals(
totalSuccess,
- location.getStats().getLoadCount()
- + location.getStats().getHitCount()
- + location2.getStats().getLoadCount()
- + location2.getStats().getHitCount()
+ location.getWeakStats().getLoadCount()
+ + location.getWeakStats().getHitCount()
+ + location2.getWeakStats().getLoadCount()
+ + location2.getWeakStats().getHitCount()
);
- Assertions.assertTrue(totalFailures <=
location.getStats().getRejectCount() + location2.getStats()
-
.getRejectCount());
+ Assertions.assertTrue(totalFailures <=
location.getWeakStats().getRejectCount() + location2.getWeakStats()
+
.getRejectCount());
if (expectHits) {
- Assertions.assertTrue(location.getStats().getHitCount() >= 0);
- Assertions.assertTrue(location2.getStats().getHitCount() >= 0);
+ Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+ Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
} else {
- Assertions.assertEquals(0, location.getStats().getHitCount());
- Assertions.assertEquals(0, location2.getStats().getHitCount());
+ Assertions.assertEquals(0, location.getWeakStats().getHitCount());
+ Assertions.assertEquals(0, location2.getWeakStats().getHitCount());
}
assertNoLooseEnds();
@@ -719,14 +719,14 @@ class SegmentLocalCacheManagerConcurrencyTest
// 5 because __drop path
Assertions.assertTrue(5 >= location.getPath().listFiles().length);
Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
- Assertions.assertTrue(location.getStats().getLoadCount() >= 4);
- Assertions.assertTrue(location2.getStats().getLoadCount() >= 4);
- Assertions.assertEquals(location.getStats().getEvictionCount(),
location.getStats().getUnmountCount());
- Assertions.assertEquals(location2.getStats().getEvictionCount(),
location2.getStats().getUnmountCount());
- Assertions.assertEquals(location.getStats().getLoadCount() - 4,
location.getStats().getEvictionCount());
- Assertions.assertEquals(location2.getStats().getLoadCount() - 4,
location2.getStats().getEvictionCount());
- Assertions.assertEquals(location.getStats().getLoadCount() - 4,
location.getStats().getUnmountCount());
- Assertions.assertEquals(location2.getStats().getLoadCount() - 4,
location2.getStats().getUnmountCount());
+ Assertions.assertTrue(location.getWeakStats().getLoadCount() >= 4);
+ Assertions.assertTrue(location2.getWeakStats().getLoadCount() >= 4);
+ Assertions.assertEquals(location.getWeakStats().getEvictionCount(),
location.getWeakStats().getUnmountCount());
+ Assertions.assertEquals(location2.getWeakStats().getEvictionCount(),
location2.getWeakStats().getUnmountCount());
+ Assertions.assertEquals(location.getWeakStats().getLoadCount() - 4,
location.getWeakStats().getEvictionCount());
+ Assertions.assertEquals(location2.getWeakStats().getLoadCount() - 4,
location2.getWeakStats().getEvictionCount());
+ Assertions.assertEquals(location.getWeakStats().getLoadCount() - 4,
location.getWeakStats().getUnmountCount());
+ Assertions.assertEquals(location2.getWeakStats().getLoadCount() - 4,
location2.getWeakStats().getUnmountCount());
}
private void makeSegmentsToLoad(
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index 5d8b110e17a..75949084db3 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -112,7 +112,8 @@ public class LatchableEmitter extends StubServiceEmitter
}
/**
- * Waits until an event that satisfies the given predicate is emitted.
+ * Waits until an event that satisfies the given predicate is emitted,
considering all previously observed events
+ * since the last call to {@link #flush()} as potential candidates for match
as well.
*
* @param condition condition to wait for
* @param timeoutMillis timeout, or negative to not use a timeout
@@ -137,15 +138,56 @@ public class LatchableEmitter extends StubServiceEmitter
}
/**
- * Wait until a metric event that matches the given condition is emitted.
+ * Waits until the next event that satisfies the given predicate is emitted.
This method should only be called
+ * if the caller is certain that a metric matching the condition is going to
be emitted, such as from a monitor.
+ *
+ * @param condition condition to wait for
+ * @param timeoutMillis timeout, or negative to not use a timeout
+ */
+ public void waitForNextEvent(Predicate<Event> condition, long timeoutMillis)
+ {
+ final WaitCondition waitCondition = new WaitCondition(condition);
+ registerWaitConditionNextEvent(waitCondition);
+
+ try {
+ final long awaitTime = timeoutMillis >= 0 ? timeoutMillis :
Long.MAX_VALUE;
+ if (!waitCondition.countDownLatch.await(awaitTime,
TimeUnit.MILLISECONDS)) {
+ throw new ISE("Timed out waiting for next event");
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ waitConditions.remove(waitCondition);
+ }
+ }
+
+ /**
+ * Wait until a metric event that matches the given condition is emitted,
considering all previously observed events
+ * since the last call to {@link #flush()} as potential candidates for match
as well.
* Uses the {@link LatchableEmitterConfig#defaultWaitTimeoutMillis}.
*/
public ServiceMetricEvent waitForEvent(UnaryOperator<EventMatcher> condition)
{
final EventMatcher matcher = condition.apply(new EventMatcher());
waitForEvent(
- event -> event instanceof ServiceMetricEvent
- && matcher.test((ServiceMetricEvent) event),
+ event -> event instanceof ServiceMetricEvent &&
matcher.test((ServiceMetricEvent) event),
+ defaultWaitTimeoutMillis
+ );
+ return matcher.matchingEvent.get();
+ }
+
+ /**
+ * Wait until the next metric event that matches the given condition is
emitted. This method should only be called
+ * if the caller is certain that a metric matching the condition is going to
be emitted, such as from a monitor.
+ * Uses the {@link LatchableEmitterConfig#defaultWaitTimeoutMillis}.
+ */
+ public ServiceMetricEvent waitForNextEvent(UnaryOperator<EventMatcher>
condition)
+ {
+ final EventMatcher matcher = condition.apply(new EventMatcher());
+ waitForNextEvent(
+ event -> event instanceof ServiceMetricEvent &&
matcher.test((ServiceMetricEvent) event),
defaultWaitTimeoutMillis
);
return matcher.matchingEvent.get();
@@ -227,6 +269,17 @@ public class LatchableEmitter extends StubServiceEmitter
}
}
+ /**
+ * Evaluates the given new condition for all past events and then adds it to
+ * {@link #waitConditions}.
+ */
+ private void registerWaitConditionNextEvent(WaitCondition condition)
+ {
+ eventProcessingLock.lock();
+ waitConditions.add(condition);
+ eventProcessingLock.unlock();
+ }
+
private static class WaitCondition
{
private final Predicate<Event> predicate;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]