This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aec41b8feb Minor clean up of LatchableEmitter / StubServiceEmitter 
(#18255)
7aec41b8feb is described below

commit 7aec41b8feb51efc8ce286b6b136b12c80537a22
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Jul 16 19:43:12 2025 +0530

    Minor clean up of LatchableEmitter / StubServiceEmitter (#18255)
    
    Follow up to #18249
    
    Changes:
    - Maintain a List of processed events in `LatchableEmitter`.
    This is an improvement over the current flow where a copy of events is 
created upon receiving every new event.
    - When a new condition is registered, evaluate all past events upfront, 
then add it to the set of wait conditions
    - Evaluate each new event as it is received
    
    Other changes:
    - Hide the internal queue implementation of `StubServiceEmitter` from tests 
and sub-classes
    - Reduce the usage of `StubServiceEmitter.getEvents()`. Use the inbuilt 
`verifyValue` methods instead.
---
 .../overlord/common/KubernetesPeonClientTest.java  |   2 +-
 .../overlord/duty/UnusedSegmentsKillerTest.java    |   5 +-
 .../SeekableStreamSupervisorSpecTest.java          |   9 +-
 .../SeekableStreamSupervisorStateTest.java         |   2 +-
 .../worker/shuffle/ShuffleMonitorTest.java         |  29 ++----
 .../util/emitter/service/ServiceMetricEvent.java   |   8 ++
 .../emitter/service/ServiceMetricEventTest.java    |  29 +++++-
 .../java/util/metrics/CgroupCpuSetMonitorTest.java |  23 ++---
 .../java/util/metrics/CgroupDiskMonitorTest.java   |   4 +-
 .../java/util/metrics/CgroupMemoryMonitorTest.java |   5 +-
 .../java/util/metrics/CgroupV2CpuMonitorTest.java  |   5 +-
 .../java/util/metrics/CgroupV2DiskMonitorTest.java |   4 +-
 .../java/util/metrics/CpuAcctDeltaMonitorTest.java |   4 +-
 .../util/metrics/HttpPostEmitterMonitorTest.java   |  39 +++-----
 .../java/util/metrics/OshiSysMonitorTest.java      |  26 +++---
 .../java/util/metrics/StubServiceEmitter.java      |  52 +++--------
 .../druid/query/CPUTimeMetricQueryRunnerTest.java  |   2 +-
 .../druid/query/DefaultQueryMetricsTest.java       |   2 +-
 .../druid/client/cache/MemcachedCacheTest.java     |  12 +--
 .../curator/DruidConnectionStateListenerTest.java  |  12 +--
 .../appenderator/StreamAppenderatorTest.java       |  17 ++--
 .../druid/server/ClientQuerySegmentWalkerTest.java |  35 ++++---
 .../druid/server/audit/SQLAuditManagerTest.java    |  17 +---
 .../lookup/cache/LookupCoordinatorManagerTest.java |   2 +-
 .../server/metrics/GroupByStatsMonitorTest.java    |  25 ++---
 .../metrics/HistoricalMetricsMonitorTest.java      |  73 +++++----------
 .../druid/server/metrics/LatchableEmitter.java     | 101 ++++++++++++---------
 .../server/metrics/QueryCountStatsMonitorTest.java |  41 +++------
 .../server/metrics/ServiceStatusMonitorTest.java   |  27 ++----
 .../server/metrics/TaskCountStatsMonitorTest.java  |   2 +-
 .../metrics/TaskSlotCountStatsMonitorTest.java     |   2 +-
 .../metrics/WorkerTaskCountStatsMonitorTest.java   |   8 +-
 .../server/AsyncQueryForwardingServletTest.java    |   3 +-
 .../embedded/emitter/LatchableEmitterModule.java   |   6 +-
 34 files changed, 270 insertions(+), 363 deletions(-)

diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index 4d4e1cd9a1b..9c831217914 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -93,7 +93,7 @@ public class KubernetesPeonClientTest
     Pod peonPod = instance.launchPeonJobAndWaitForStart(job, 
NoopTask.create(), 1, TimeUnit.SECONDS);
 
     Assertions.assertNotNull(peonPod);
-    Assertions.assertEquals(1, serviceEmitter.getEvents().size());
+    Assertions.assertEquals(1, serviceEmitter.getNumEmittedEvents());
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
index 111489a74dc..0cc5f6f5f23 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.metadata.UnusedSegmentKillerConfig;
@@ -52,7 +53,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -329,8 +329,7 @@ public class UnusedSegmentsKillerTest
     emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
 
     // Verify that the kill intervals are sorted with the oldest interval first
-    final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
-        emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
+    final List<ServiceMetricEvent> events = 
emitter.getMetricEvents(TaskMetrics.RUN_DURATION);
     final List<Interval> killIntervals = events.stream().map(event -> {
       final String taskId = (String) 
event.getUserDims().get(DruidMetrics.TASK_ID);
       String[] splits = taskId.split("_");
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index cfd49994262..a663c7c19aa 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -780,8 +780,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Assert.assertEquals(2, taskCountAfterScaleOut);
     Assert.assertTrue(
         dynamicActionEmitter
-            .getMetricEvents()
-            .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
             .stream()
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
@@ -840,8 +839,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     Assert.assertTrue(
         dynamicActionEmitter
-            .getMetricEvents()
-            .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
             .stream()
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
@@ -1103,8 +1101,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     Assert.assertTrue(
         dynamicActionEmitter
-            .getMetricEvents()
-            .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
             .stream()
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 4ae5a44c1c8..feba656ee63 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2521,7 +2521,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     latch.await();
 
     supervisor.emitLag();
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
   }
 
   private void validateSupervisorStateAfterResetOffsets(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
index 1174bc842ed..6ee98a8d309 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
@@ -21,14 +21,12 @@ package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableMap;
 import 
org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.List;
+import java.util.Map;
 
 public class ShuffleMonitorTest
 {
@@ -46,23 +44,16 @@ public class ShuffleMonitorTest
     final ShuffleMonitor monitor = new ShuffleMonitor();
     monitor.setShuffleMetrics(shuffleMetrics);
     Assert.assertTrue(monitor.doMonitor(emitter));
-    final List<Event> events = emitter.getEvents();
-    Assert.assertEquals(2, events.size());
-    Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
-    ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
-    Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
-    Assert.assertEquals(310L, event.getValue());
-    Assert.assertEquals(
-        ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, 
"supervisor"),
-        event.getUserDims()
+    Assert.assertEquals(2, emitter.getNumEmittedEvents());
+    emitter.verifyValue(
+        ShuffleMonitor.SHUFFLE_BYTES_KEY,
+        Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
+        310L
     );
-    Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
-    event = (ServiceMetricEvent) events.get(1);
-    Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, 
event.getMetric());
-    Assert.assertEquals(3, event.getValue());
-    Assert.assertEquals(
-        ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, 
"supervisor"),
-        event.getUserDims()
+    emitter.verifyValue(
+        ShuffleMonitor.SHUFFLE_REQUESTS_KEY,
+        Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
+        3
     );
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
 
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
index 3bab9a3ad04..f7f3549ecf7 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
@@ -125,6 +125,14 @@ public class ServiceMetricEvent implements Event
         .build();
   }
 
+  /**
+   * Creates an immutable copy of this metric event. This is used only in 
tests.
+   */
+  public ServiceMetricEvent copy()
+  {
+    return new ServiceMetricEvent(createdTime, serviceDims, 
Map.copyOf(userDims), feed, metric, value);
+  }
+
   /**
    * Builder for a {@link ServiceMetricEvent}. This builder can be used for
    * building only one event.
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
index 5fbf4dc9c9e..cf59f0f3c40 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
@@ -35,7 +35,7 @@ import java.util.Map;
 public class ServiceMetricEventTest
 {
   @Test
-  public void testStupidTest()
+  public void testBuilder()
   {
     ServiceMetricEvent builderEvent = new ServiceMetricEvent.Builder()
         .setDimension("user1", "a")
@@ -317,4 +317,31 @@ public class ServiceMetricEventTest
     Assert.assertTrue(target.getUserDims().isEmpty());
     Assert.assertNull(target.getUserDims().get("userDimMap"));
   }
+
+  @Test
+  public void test_copy_returnsAnImmutableInstance()
+  {
+    final ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent
+        .builder()
+        .setDimension("dim1", "v1")
+        .setMetric("m1", 100);
+
+    final ServiceMetricEvent event1 = eventBuilder.build("coordinator", 
"localhost");
+    final ServiceMetricEvent event1Copy = event1.copy();
+
+    Assert.assertEquals(Map.of("dim1", "v1"), event1.getUserDims());
+    Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
+
+    final ServiceMetricEvent event2 = eventBuilder
+        .setDimension("dim2", "v2")
+        .setMetric("m2", 200)
+        .build("coordinator", "localhost");
+
+    // Verify that the original event gets changed dimensions
+    Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), 
event2.getUserDims());
+    Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), 
event1.getUserDims());
+
+    // But the event copy still has the original dimensions
+    Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
index 027bd995b33..b18aa138fa5 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.java.util.metrics;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
 import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
 import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -34,8 +33,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 public class CgroupCpuSetMonitorTest
 {
@@ -72,19 +69,11 @@ public class CgroupCpuSetMonitorTest
     final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, 
ImmutableMap.of(), "some_feed");
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     Assert.assertTrue(monitor.doMonitor(emitter));
-    final List<Event> actualEvents = emitter.getEvents();
-    Assert.assertEquals(4, actualEvents.size());
-    final Map<String, Object> cpusEvent = actualEvents.get(0).toMap();
-    final Map<String, Object> effectiveCpusEvent = actualEvents.get(1).toMap();
-    final Map<String, Object> memsEvent = actualEvents.get(2).toMap();
-    final Map<String, Object> effectiveMemsEvent = actualEvents.get(3).toMap();
-    Assert.assertEquals("cgroup/cpuset/cpu_count", cpusEvent.get("metric"));
-    Assert.assertEquals(8, cpusEvent.get("value"));
-    Assert.assertEquals("cgroup/cpuset/effective_cpu_count", 
effectiveCpusEvent.get("metric"));
-    Assert.assertEquals(7, effectiveCpusEvent.get("value"));
-    Assert.assertEquals("cgroup/cpuset/mems_count", memsEvent.get("metric"));
-    Assert.assertEquals(4, memsEvent.get("value"));
-    Assert.assertEquals("cgroup/cpuset/effective_mems_count", 
effectiveMemsEvent.get("metric"));
-    Assert.assertEquals(1, effectiveMemsEvent.get("value"));
+    Assert.assertEquals(4, emitter.getNumEmittedEvents());
+
+    emitter.verifyValue("cgroup/cpuset/cpu_count", 8);
+    emitter.verifyValue("cgroup/cpuset/effective_cpu_count", 7);
+    emitter.verifyValue("cgroup/cpuset/mems_count", 4);
+    emitter.verifyValue("cgroup/cpuset/effective_mems_count", 1);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
index 21a83458729..d75af320f12 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
@@ -68,13 +68,13 @@ public class CgroupDiskMonitorTest
     final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, 
ImmutableMap.of(), "some_feed");
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     Assert.assertTrue(monitor.doMonitor(emitter));
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
 
     TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2", 
serviceBytesFile);
     TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2", 
servicedFile);
 
     Assert.assertTrue(monitor.doMonitor(emitter));
-    Assert.assertEquals(8, emitter.getEvents().size());
+    Assert.assertEquals(8, emitter.getNumEmittedEvents());
     Assert.assertTrue(
         emitter
             .getEvents()
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
index 7827368ec9f..6538cb9691a 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.java.util.metrics;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
 import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
 import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -34,7 +33,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 
 public class CgroupMemoryMonitorTest
 {
@@ -71,7 +69,6 @@ public class CgroupMemoryMonitorTest
     final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, 
ImmutableMap.of(), "some_feed");
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     Assert.assertTrue(monitor.doMonitor(emitter));
-    final List<Event> actualEvents = emitter.getEvents();
-    Assert.assertEquals(46, actualEvents.size());
+    Assert.assertEquals(46, emitter.getNumEmittedEvents());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
index 7006de1edb0..07682643148 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.java.util.metrics;
 
 import com.google.common.collect.ImmutableSet;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
 import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
 import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -33,7 +32,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 import java.util.stream.Collectors;
 
 public class CgroupV2CpuMonitorTest
@@ -65,8 +63,7 @@ public class CgroupV2CpuMonitorTest
     final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     Assert.assertTrue(monitor.doMonitor(emitter));
-    final List<Event> actualEvents = emitter.getEvents();
-    Assert.assertEquals(0, actualEvents.size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
 
     emitter.flush();
 
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
index 40201a30feb..f9a977edd92 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
@@ -61,14 +61,14 @@ public class CgroupV2DiskMonitorTest
     final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     Assert.assertTrue(monitor.doMonitor(emitter));
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
 
     emitter.flush();
 
     TestUtils.copyOrReplaceResource("/cgroupv2/io.stat-2", statFile);
 
     Assert.assertTrue(monitor.doMonitor(emitter));
-    Assert.assertEquals(4, emitter.getEvents().size());
+    Assert.assertEquals(4, emitter.getNumEmittedEvents());
     Assert.assertTrue(
         emitter
             .getEvents()
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
index 97dd707299e..94cb1d9b3aa 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
@@ -94,10 +94,10 @@ public class CpuAcctDeltaMonitorTest
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     Assert.assertFalse(monitor.doMonitor(emitter));
     // First should just cache
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
     Assert.assertTrue(cpuacct.delete());
     TestUtils.copyResource("/cpuacct.usage_all", cpuacct);
     Assert.assertTrue(monitor.doMonitor(emitter));
-    Assert.assertEquals(2 * 128 + 1, emitter.getEvents().size());
+    Assert.assertEquals(2 * 128 + 1, emitter.getNumEmittedEvents());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
index 2be52c091e7..819c9ba50d7 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
@@ -25,10 +25,6 @@ import 
org.apache.druid.java.util.emitter.core.HttpPostEmitter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Map;
-import java.util.Queue;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -65,26 +61,19 @@ public class HttpPostEmitterMonitorTest
 
     assertTrue(monitor.doMonitor(stubServiceEmitter));
 
-    final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = stubServiceEmitter.getMetricEvents();
-
-    assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
-    assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
-    assertMetricValue(metricEvents, "emitter/successfulSending/minTimeMs", 0);
-    assertMetricValue(metricEvents, "emitter/buffers/emitQueue", 30);
-    assertMetricValue(metricEvents, "emitter/failedSending/minTimeMs", 0);
-    assertMetricValue(metricEvents, "emitter/buffers/allocated/delta", 20);
-    assertMetricValue(metricEvents, "emitter/batchFilling/maxTimeMs", 0);
-    assertMetricValue(metricEvents, "emitter/buffers/dropped/delta", 10);
-    assertMetricValue(metricEvents, "emitter/batchFilling/minTimeMs", 0);
-    assertMetricValue(metricEvents, "emitter/events/emitQueue", 200L);
-    assertMetricValue(metricEvents, "emitter/events/large/emitQueue", 75L);
-    assertMetricValue(metricEvents, "emitter/buffers/reuseQueue", 15);
-    assertMetricValue(metricEvents, "emitter/buffers/failed/delta", 5);
-    assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
-  }
-
-  private void assertMetricValue(Map<String, 
Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String 
metricName, Number expectedValue)
-  {
-    
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(),
 expectedValue.doubleValue());
+    stubServiceEmitter.verifyValue("emitter/successfulSending/maxTimeMs", 0);
+    stubServiceEmitter.verifyValue("emitter/events/emitted/delta", 100L);
+    stubServiceEmitter.verifyValue("emitter/successfulSending/minTimeMs", 0);
+    stubServiceEmitter.verifyValue("emitter/buffers/emitQueue", 30);
+    stubServiceEmitter.verifyValue("emitter/failedSending/minTimeMs", 0);
+    stubServiceEmitter.verifyValue("emitter/buffers/allocated/delta", 20);
+    stubServiceEmitter.verifyValue("emitter/batchFilling/maxTimeMs", 0);
+    stubServiceEmitter.verifyValue("emitter/buffers/dropped/delta", 10);
+    stubServiceEmitter.verifyValue("emitter/batchFilling/minTimeMs", 0);
+    stubServiceEmitter.verifyValue("emitter/events/emitQueue", 200L);
+    stubServiceEmitter.verifyValue("emitter/events/large/emitQueue", 75L);
+    stubServiceEmitter.verifyValue("emitter/buffers/reuseQueue", 15);
+    stubServiceEmitter.verifyValue("emitter/buffers/failed/delta", 5);
+    stubServiceEmitter.verifyValue("emitter/failedSending/maxTimeMs", 0);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
index ece54cd3fa6..c31ee8ea776 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
@@ -107,7 +107,7 @@ public class OshiSysMonitorTest
     m.start();
     m.monitorMemStats(emitter);
     m.stop();
-    Assert.assertEquals(3, emitter.getEvents().size());
+    Assert.assertEquals(3, emitter.getNumEmittedEvents());
     emitter.verifyEmitted("sys/mem/max", 1);
     emitter.verifyEmitted("sys/mem/used", 1);
     emitter.verifyEmitted("sys/mem/free", 1);
@@ -129,7 +129,7 @@ public class OshiSysMonitorTest
     m.start();
     m.doMonitor(emitter);
     m.stop();
-    Assert.assertEquals(3, emitter.getEvents().size());
+    Assert.assertEquals(3, emitter.getNumEmittedEvents());
     emitter.verifyEmitted("sys/mem/max", 1);
     emitter.verifyEmitted("sys/mem/used", 1);
     emitter.verifyEmitted("sys/mem/free", 1);
@@ -153,7 +153,7 @@ public class OshiSysMonitorTest
     OshiSysMonitor m = createMonitor(si);
     m.start();
     m.monitorSwapStats(emitter);
-    Assert.assertEquals(4, emitter.getEvents().size());
+    Assert.assertEquals(4, emitter.getNumEmittedEvents());
     emitter.verifyEmitted("sys/swap/pageIn", 1);
     emitter.verifyEmitted("sys/swap/pageOut", 1);
     emitter.verifyEmitted("sys/swap/max", 1);
@@ -201,7 +201,7 @@ public class OshiSysMonitorTest
     OshiSysMonitor m = createMonitor(si);
     m.start();
     m.monitorFsStats(emitter);
-    Assert.assertEquals(8, emitter.getEvents().size());
+    Assert.assertEquals(8, emitter.getNumEmittedEvents());
     emitter.verifyEmitted("sys/fs/max", 2);
     emitter.verifyEmitted("sys/fs/used", 2);
     emitter.verifyEmitted("sys/fs/files/count", 2);
@@ -272,7 +272,7 @@ public class OshiSysMonitorTest
     OshiSysMonitor m = createMonitor(si);
     m.start();
     m.monitorDiskStats(emitter);
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
 
     Mockito.when(disk1.getReadBytes()).thenReturn(400L);
     Mockito.when(disk1.getReads()).thenReturn(220L);
@@ -288,7 +288,7 @@ public class OshiSysMonitorTest
     Mockito.when(disk2.getTransferTime()).thenReturn(1100L);
 
     m.monitorDiskStats(emitter);
-    Assert.assertEquals(12, emitter.getEvents().size());
+    Assert.assertEquals(12, emitter.getNumEmittedEvents());
 
     Map<String, Object> userDims1 = ImmutableMap.of(
         "diskName",
@@ -362,7 +362,7 @@ public class OshiSysMonitorTest
     OshiSysMonitor m = createMonitor(si);
     m.start();
     m.monitorNetStats(emitter);
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
 
     Mockito.when(net1.getBytesRecv()).thenReturn(400L);
     Mockito.when(net1.getPacketsRecv()).thenReturn(220L);
@@ -375,7 +375,7 @@ public class OshiSysMonitorTest
 
 
     m.monitorNetStats(emitter);
-    Assert.assertEquals(16, emitter.getEvents().size()); // 8 * 2 whitelisted 
ips
+    Assert.assertEquals(16, emitter.getNumEmittedEvents()); // 8 * 2 
whitelisted ips
 
     Map<String, Object> userDims1 = ImmutableMap.of(
         "netName",
@@ -460,7 +460,7 @@ public class OshiSysMonitorTest
     OshiSysMonitor m = createMonitor(si);
     m.start();
     m.monitorCpuStats(emitter);
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
 
     long[][] procTicks2 = new long[][]{
         {4L, 5L, 6L, 8L, 9L, 7L, 10L, 12L},     // Δtick1 {3,3,3,4,4,1,3,4} 
_total = 25, emitted percentage
@@ -470,7 +470,7 @@ public class OshiSysMonitorTest
 
     m.monitorCpuStats(emitter);
     m.stop();
-    Assert.assertEquals(16, emitter.getEvents().size()); // 8 ticktype * 2 
processors
+    Assert.assertEquals(16, emitter.getNumEmittedEvents()); // 8 ticktype * 2 
processors
 
     Map<String, Object> userDims = new HashMap<>();
     userDims.put("cpuName", "0");
@@ -557,7 +557,7 @@ public class OshiSysMonitorTest
     OshiSysMonitor m = createMonitor(si);
     m.start();
     m.monitorSysStats(emitter);
-    Assert.assertEquals(4, emitter.getEvents().size());
+    Assert.assertEquals(4, emitter.getNumEmittedEvents());
     m.stop();
     emitter.verifyEmitted("sys/uptime", 1);
     emitter.verifyEmitted("sys/la/1", 1);
@@ -592,7 +592,7 @@ public class OshiSysMonitorTest
     m.start();
     m.monitorTcpStats(emitter);
 
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
     Mockito.when(tcpv4.getConnectionsActive()).thenReturn(20L);
     Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(25L);
     Mockito.when(tcpv4.getConnectionFailures()).thenReturn(8L);
@@ -604,7 +604,7 @@ public class OshiSysMonitorTest
     Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L);
     m.monitorTcpStats(emitter);
     m.stop();
-    Assert.assertEquals(9, emitter.getEvents().size());
+    Assert.assertEquals(9, emitter.getNumEmittedEvents());
     emitter.verifyValue("sys/tcpv4/activeOpens", 10L);
     emitter.verifyValue("sys/tcpv4/passiveOpens", 5L);
     emitter.verifyValue("sys/tcpv4/attemptFails", 3L);
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 55113b97ac2..b330573c362 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -41,7 +41,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
 {
   private final Queue<Event> events = new ConcurrentLinkedDeque<>();
   private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
-  private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>> 
metricEvents = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Queue<ServiceMetricEvent>> 
metricEvents = new ConcurrentHashMap<>();
 
   public StubServiceEmitter()
   {
@@ -59,7 +59,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
     if (event instanceof ServiceMetricEvent) {
       ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
       metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new 
ConcurrentLinkedDeque<>())
-                  .add(new ServiceMetricEventSnapshot(metricEvent));
+                  .add(metricEvent.copy());
     } else if (event instanceof AlertEvent) {
       alertEvents.add((AlertEvent) event);
     }
@@ -74,14 +74,20 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
     return new ArrayList<>(events);
   }
 
+  public int getNumEmittedEvents()
+  {
+    return events.size();
+  }
+
   /**
-   * Gets all the metric events emitted since the previous {@link #flush()}.
+   * Gets all the metric events emitted for the given metric name since the 
previous {@link #flush()}.
    *
-   * @return Map from metric name to list of events emitted for that metric.
+   * @return List of events emitted for the given metric.
    */
-  public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
+  public List<ServiceMetricEvent> getMetricEvents(String metricName)
   {
-    return metricEvents;
+    final Queue<ServiceMetricEvent> metricEventQueue = 
metricEvents.get(metricName);
+    return metricEventQueue == null ? List.of() : 
List.copyOf(metricEventQueue);
   }
 
   /**
@@ -99,18 +105,18 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
   )
   {
     final List<Number> values = new ArrayList<>();
-    final Queue<ServiceMetricEventSnapshot> events =
+    final Queue<ServiceMetricEvent> events =
         metricEvents.getOrDefault(metricName, new ArrayDeque<>());
     final Map<String, Object> filters =
         dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
-    for (ServiceMetricEventSnapshot event : events) {
+    for (ServiceMetricEvent event : events) {
       final Map<String, Object> userDims = event.getUserDims();
       boolean match = filters.keySet().stream()
                              .map(d -> filters.get(d).equals(userDims.get(d)))
                              .reduce((a, b) -> a && b)
                              .orElse(true);
       if (match) {
-        values.add(event.getMetricEvent().getValue());
+        values.add(event.getValue());
       }
     }
 
@@ -134,32 +140,4 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
   public void close()
   {
   }
-
-  /**
-   * Helper class to encapsulate a ServiceMetricEvent and its user dimensions.
-   * Since {@link StubServiceEmitter} doesn't actually emit metrics and saves 
the emitted metrics in-memory,
-   * this helper class saves a copy of {@link ServiceMetricEvent#userDims} of 
emitted metrics
-   * via {@link ServiceMetricEvent#getUserDims()} as it can get mutated.
-   */
-  public static class ServiceMetricEventSnapshot
-  {
-    private final ServiceMetricEvent metricEvent;
-    private final Map<String, Object> userDims;
-
-    public ServiceMetricEventSnapshot(ServiceMetricEvent metricEvent)
-    {
-      this.metricEvent = metricEvent;
-      this.userDims = metricEvent.getUserDims();
-    }
-
-    public ServiceMetricEvent getMetricEvent()
-    {
-      return metricEvent;
-    }
-
-    public Map<String, Object> getUserDims()
-    {
-      return userDims;
-    }
-  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
index 4647704fe27..32d9852d0c1 100644
--- 
a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
@@ -69,7 +69,7 @@ public class CPUTimeMetricQueryRunnerTest
     );
 
     Assert.assertEquals(expectedResults, results.toList());
-    Assert.assertEquals(1, emitter.getEvents().size());
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
 
     final Event event = Iterables.getOnlyElement(emitter.getEvents());
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java 
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index ce7481c4e4a..f7cd6887e7e 100644
--- 
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -138,7 +138,7 @@ public class DefaultQueryMetricsTest extends 
InitializedNullHandlingTest
     // Verify that Queried Segment Count does not get emitted by the 
DefaultQueryMetrics
     // and the total number of emitted metrics remains unchanged
     queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter);
-    Assert.assertEquals(10, serviceEmitter.getEvents().size());
+    Assert.assertEquals(10, serviceEmitter.getNumEmittedEvents());
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java 
b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
index f87c7b679cc..e8209a444cb 100644
--- a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
+++ b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.client.cache;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -50,11 +49,9 @@ import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.initialization.Initialization;
-import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
@@ -206,16 +203,13 @@ public class MemcachedCacheTest extends 
CacheTestBase<MemcachedCache>
     final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig);
     final StubServiceEmitter serviceEmitter = new 
StubServiceEmitter("service", "host");
 
-    while (serviceEmitter.getEvents().isEmpty()) {
+    while (serviceEmitter.getNumEmittedEvents() <= 0) {
       Thread.sleep(memcachedCacheConfig.getTimeout());
       cache.doMonitor(serviceEmitter);
     }
 
-    Assert.assertFalse(serviceEmitter.getEvents().isEmpty());
-    ObjectMapper mapper = new DefaultObjectMapper();
-    for (Event event : serviceEmitter.getEvents()) {
-      log.debug("Found event `%s`", mapper.writeValueAsString(event.toMap()));
-    }
+    Assert.assertTrue(serviceEmitter.getNumEmittedEvents() > 0);
+    
Assert.assertFalse(serviceEmitter.getMetricEvents("query/cache/memcached/total").isEmpty());
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
 
b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
index 86ff59c25ae..4a9b7a4e36f 100644
--- 
a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
+++ 
b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
@@ -60,7 +60,7 @@ public class DruidConnectionStateListenerTest
   public void test_doMonitor_init()
   {
     listener.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
     emitter.verifyValue("zk/connected", 0);
   }
 
@@ -69,7 +69,7 @@ public class DruidConnectionStateListenerTest
   {
     listener.stateChanged(null, ConnectionState.CONNECTED);
     listener.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
 
     emitter.verifyValue("zk/connected", 1);
   }
@@ -79,7 +79,7 @@ public class DruidConnectionStateListenerTest
   {
     listener.stateChanged(null, ConnectionState.SUSPENDED);
     listener.doMonitor(emitter);
-    Assert.assertEquals(2, emitter.getEvents().size()); // 2 because 
stateChanged emitted an alert
+    Assert.assertEquals(2, emitter.getNumEmittedEvents()); // 2 because 
stateChanged emitted an alert
 
     emitter.verifyValue("zk/connected", 0);
   }
@@ -88,7 +88,7 @@ public class DruidConnectionStateListenerTest
   public void test_suspendedAlert()
   {
     listener.stateChanged(null, ConnectionState.SUSPENDED);
-    Assert.assertEquals(1, emitter.getEvents().size());
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
 
     final AlertEvent alert = emitter.getAlerts().get(0);
     Assert.assertEquals("alerts", alert.getFeed());
@@ -99,10 +99,10 @@ public class DruidConnectionStateListenerTest
   public void test_reconnectedMetric()
   {
     listener.stateChanged(null, ConnectionState.SUSPENDED);
-    Assert.assertEquals(1, emitter.getEvents().size()); // the first 
stateChanged emits an alert
+    Assert.assertEquals(1, emitter.getNumEmittedEvents()); // the first 
stateChanged emits an alert
 
     listener.stateChanged(null, ConnectionState.RECONNECTED);
-    Assert.assertEquals(2, emitter.getEvents().size()); // the second 
stateChanged emits a metric
+    Assert.assertEquals(2, emitter.getNumEmittedEvents()); // the second 
stateChanged emits a metric
 
     long observedReconnectTime = emitter.getValue("zk/reconnect/time", 
null).longValue();
     Assert.assertTrue(observedReconnectTime >= 0);
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 463c4ed8c53..f7e2bb0b57e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -78,7 +78,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -2279,17 +2278,15 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
 
   private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> 
segmentIds)
   {
-    Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events = 
emitter.getMetricEvents();
     int segments = segmentIds.size();
-    Assert.assertEquals(4, events.size());
-    Assert.assertTrue(events.containsKey("query/cpu/time"));
-    Assert.assertEquals(segments, events.get("query/segment/time").size());
-    Assert.assertEquals(segments, 
events.get("query/segmentAndCache/time").size());
-    Assert.assertEquals(segments, events.get("query/wait/time").size());
+    emitter.verifyEmitted("query/cpu/time", 1);
+    Assert.assertEquals(segments, 
emitter.getMetricEvents("query/segment/time").size());
+    Assert.assertEquals(segments, 
emitter.getMetricEvents("query/segmentAndCache/time").size());
+    Assert.assertEquals(segments, 
emitter.getMetricEvents("query/wait/time").size());
     for (String id : segmentIds) {
-      
Assert.assertTrue(events.get("query/segment/time").stream().anyMatch(value -> 
value.getUserDims().containsValue(id)));
-      
Assert.assertTrue(events.get("query/segmentAndCache/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
-      Assert.assertTrue(events.get("query/wait/time").stream().anyMatch(value 
-> value.getUserDims().containsValue(id)));
+      
Assert.assertTrue(emitter.getMetricEvents("query/segment/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
+      
Assert.assertTrue(emitter.getMetricEvents("query/segmentAndCache/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
+      
Assert.assertTrue(emitter.getMetricEvents("query/wait/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
 
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index 484ae809fc0..db1b97bce63 100644
--- 
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -31,8 +31,8 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.DataSource;
@@ -987,9 +987,7 @@ public class ClientQuerySegmentWalkerTest
         ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
     );
 
-    List<Event> events = emitter.getEvents();
-
-    for (Event event : events) {
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC)) {
       EventMap map = event.toMap();
       if 
(ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
         Assert.assertTrue(map.containsKey("host"));
@@ -1038,23 +1036,22 @@ public class ClientQuerySegmentWalkerTest
         ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
     );
 
-    List<Event> events = emitter.getEvents();
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC)) {
+      EventMap map = event.toMap();
+      Assert.assertTrue(map.containsKey("host"));
+      Assert.assertTrue(map.containsKey("service"));
+      Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
+      Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
+      Assert.assertEquals(3, map.get("value"));
+    }
 
-    for (Event event : events) {
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents(ClientQuerySegmentWalker.BYTES_COUNT_METRIC)) {
       EventMap map = event.toMap();
-      if 
(ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
-        Assert.assertTrue(map.containsKey("host"));
-        Assert.assertTrue(map.containsKey("service"));
-        Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
-        Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
-        Assert.assertEquals(3, map.get("value"));
-      } else if 
(ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) {
-        Assert.assertTrue(map.containsKey("host"));
-        Assert.assertTrue(map.containsKey("service"));
-        Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
-        Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
-        Assert.assertEquals(43L, map.get("value"));
-      }
+      Assert.assertTrue(map.containsKey("host"));
+      Assert.assertTrue(map.containsKey("service"));
+      Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
+      Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
+      Assert.assertEquals(43L, map.get("value"));
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java 
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index 3505eb943a2..650bf7d02cf 100644
--- 
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -43,7 +43,6 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.TreeMap;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -92,14 +91,10 @@ public class SQLAuditManagerTest
     final AuditEntry entry = createAuditEntry("testKey", "testType", 
DateTimes.nowUtc());
     auditManager.doAudit(entry);
 
-    Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = serviceEmitter.getMetricEvents();
-    Assert.assertEquals(1, metricEvents.size());
-
-    Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = 
metricEvents.get("config/audit");
-    Assert.assertNotNull(auditMetricEvents);
+    List<ServiceMetricEvent> auditMetricEvents = 
serviceEmitter.getMetricEvents("config/audit");
     Assert.assertEquals(1, auditMetricEvents.size());
 
-    ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
+    ServiceMetricEvent metric = auditMetricEvents.get(0);
 
     final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
     Assert.assertNotNull(dbEntry);
@@ -121,14 +116,12 @@ public class SQLAuditManagerTest
     Assert.assertEquals(entry, dbEntry);
 
     // Verify emitted metrics
-    Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = serviceEmitter.getMetricEvents();
-    Assert.assertEquals(1, metricEvents.size());
-
-    Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = 
metricEvents.get("config/audit");
+    List<ServiceMetricEvent> auditMetricEvents
+        = serviceEmitter.getMetricEvents("config/audit");
     Assert.assertNotNull(auditMetricEvents);
     Assert.assertEquals(1, auditMetricEvents.size());
 
-    ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
+    ServiceMetricEvent metric = auditMetricEvents.get(0);
     Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
     Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
     Assert.assertNull(metric.getUserDims().get("payload"));
diff --git 
a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
 
b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
index c94d8e43668..735d19bd95f 100644
--- 
a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
@@ -142,7 +142,7 @@ public class LookupCoordinatorManagerTest
   @After
   public void tearDown()
   {
-    Assert.assertEquals(0, SERVICE_EMITTER.getEvents().size());
+    Assert.assertEquals(0, SERVICE_EMITTER.getNumEmittedEvents());
     SERVICE_EMITTER.flush();
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index f16acd0b060..aa5955cd914 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -31,13 +31,11 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 public class GroupByStatsMonitorTest
 {
@@ -83,20 +81,15 @@ public class GroupByStatsMonitorTest
     emitter.flush();
     // Trigger metric emission
     monitor.doMonitor(emitter);
-    Map<String, Long> resultMap = emitter.getEvents()
-                                         .stream()
-                                         .collect(Collectors.toMap(
-                                             event -> (String) 
event.toMap().get("metric"),
-                                             event -> (Long) 
event.toMap().get("value")
-                                         ));
-    Assert.assertEquals(7, resultMap.size());
-    Assert.assertEquals(0, (long) 
resultMap.get("mergeBuffer/pendingRequests"));
-    Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/used"));
-    Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/queries"));
-    Assert.assertEquals(100, (long) 
resultMap.get("mergeBuffer/acquisitionTimeNs"));
-    Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries"));
-    Assert.assertEquals(200, (long) resultMap.get("groupBy/spilledBytes"));
-    Assert.assertEquals(300, (long) 
resultMap.get("groupBy/mergeDictionarySize"));
+
+    Assert.assertEquals(7, emitter.getNumEmittedEvents());
+    emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
+    emitter.verifyValue("mergeBuffer/used", 0L);
+    emitter.verifyValue("mergeBuffer/queries", 1L);
+    emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L);
+    emitter.verifyValue("groupBy/spilledQueries", 2L);
+    emitter.verifyValue("groupBy/spilledBytes", 200L);
+    emitter.verifyValue("groupBy/mergeDictionarySize", 300L);
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
index 92c2a1064e7..05865e0a2df 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
@@ -23,18 +23,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.DruidServerConfig;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.SegmentLoadDropHandler;
 import org.apache.druid.timeline.DataSegment;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.List;
 import java.util.Map;
 
 public class HistoricalMetricsMonitorTest extends EasyMockSupport
@@ -95,54 +92,26 @@ public class HistoricalMetricsMonitorTest extends 
EasyMockSupport
     monitor.doMonitor(serviceEmitter);
     EasyMock.verify(druidServerConfig, segmentManager, segmentLoadDropMgr);
 
-    final List<Event> events = serviceEmitter.getEvents();
-
-    Assert.assertEquals(ImmutableMap.<String, Object>of(
-        "metric", "segment/max",
-        "value", maxSize
-    ), asMap(events.get(0)));
-
-    Assert.assertEquals(ImmutableMap.<String, Object>of(
-        "dataSource", dataSource,
-        "metric", "segment/pendingDelete",
-        "priority", String.valueOf(priority),
-        "tier", tier,
-        "value", dataSegment.getSize()
-    ), asMap(events.get(1)));
-
-    Assert.assertEquals(ImmutableMap.<String, Object>of(
-        "metric", "segment/used",
-        "value", dataSegment.getSize(),
-        "tier", tier,
-        "priority", String.valueOf(priority),
-        "dataSource", dataSource
-    ), asMap(events.get(2)));
-
-    Assert.assertEquals(ImmutableMap.<String, Object>of(
-        "metric", "segment/usedPercent",
-        "value", dataSegment.getSize() * 1.0D / maxSize,
-        "tier", tier,
-        "priority", String.valueOf(priority),
-        "dataSource", dataSource
-    ), asMap(events.get(3)));
-
-    Assert.assertEquals(ImmutableMap.<String, Object>of(
-        "metric", "segment/count",
-        "value", 1L,
-        "tier", tier,
-        "priority", String.valueOf(priority),
-        "dataSource", dataSource
-    ), asMap(events.get(4)));
-  }
-
-  private Map<String, Object> asMap(Event event)
-  {
-    final Map<String, Object> map = event.toMap();
-    Assert.assertNotNull(map.remove("feed"));
-    Assert.assertNotNull(map.remove("timestamp"));
-    Assert.assertNotNull(map.remove("service"));
-    Assert.assertNotNull(map.remove("host"));
-
-    return map;
+    serviceEmitter.verifyValue("segment/max", maxSize);
+    serviceEmitter.verifyValue(
+        "segment/pendingDelete",
+        Map.of("tier", tier, "dataSource", dataSource, "priority", 
String.valueOf(priority)),
+        dataSegment.getSize()
+    );
+    serviceEmitter.verifyValue(
+        "segment/used",
+        Map.of("tier", tier, "priority", String.valueOf(priority), 
"dataSource", dataSource),
+        dataSegment.getSize()
+    );
+    serviceEmitter.verifyValue(
+        "segment/usedPercent",
+        Map.of("tier", tier, "priority", String.valueOf(priority), 
"dataSource", dataSource),
+        dataSegment.getSize() * 1.0D / maxSize
+    );
+    serviceEmitter.verifyValue(
+        "segment/count",
+        Map.of("tier", tier, "priority", String.valueOf(priority), 
"dataSource", dataSource),
+        1L
+    );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index aeed3c40ea9..ef78930f65a 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -20,7 +20,6 @@
 package org.apache.druid.server.metrics;
 
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -34,10 +33,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 
@@ -55,47 +53,54 @@ public class LatchableEmitter extends StubServiceEmitter
 
   public static final String TYPE = "latching";
 
+  private final Set<WaitCondition> waitConditions = new HashSet<>();
+
+  private final ReentrantLock eventProcessingLock = new ReentrantLock();
+
   /**
-   * Single-threaded executor to evaluate conditions.
+   * Lists of events that have already been processed by {@link 
#evaluateWaitConditions(Event)}.
    */
-  private final ScheduledExecutorService conditionEvaluateExecutor;
-  private final Set<WaitCondition> waitConditions = new HashSet<>();
-  private final ReentrantReadWriteLock eventReadWriteLock = new 
ReentrantReadWriteLock(true);
+  private final List<Event> processedEvents = new ArrayList<>();
 
   /**
    * Creates a {@link StubServiceEmitter} that may be used in embedded tests.
    */
-  public LatchableEmitter(String service, String host, 
ScheduledExecutorFactory executorFactory)
+  public LatchableEmitter(String service, String host)
   {
     super(service, host);
-    this.conditionEvaluateExecutor = executorFactory.create(1, 
"LatchingEmitter-eval-%d");
   }
 
   @Override
   public void emit(Event event)
   {
     super.emit(event);
-    triggerConditionEvaluations();
+    evaluateWaitConditions(event);
   }
 
   @Override
   public void flush()
   {
-    // flush() or close() is typically not called in tests until the test is 
complete
-    // but acquire a lock all the same for the sake of completeness
-    eventReadWriteLock.writeLock().lock();
+    eventProcessingLock.lock();
     try {
       super.flush();
+      processedEvents.clear();
     }
     finally {
-      eventReadWriteLock.writeLock().unlock();
+      eventProcessingLock.unlock();
     }
   }
 
   @Override
   public void close()
   {
-    flush();
+    eventProcessingLock.lock();
+    try {
+      super.close();
+      processedEvents.clear();
+    }
+    finally {
+      eventProcessingLock.unlock();
+    }
   }
 
   /**
@@ -107,9 +112,8 @@ public class LatchableEmitter extends StubServiceEmitter
   public void waitForEvent(Predicate<Event> condition, long timeoutMillis)
   {
     final WaitCondition waitCondition = new WaitCondition(condition);
-    waitConditions.add(waitCondition);
+    registerWaitCondition(waitCondition);
 
-    triggerConditionEvaluations();
     try {
       final long awaitTime = timeoutMillis >= 0 ? timeoutMillis : 
Long.MAX_VALUE;
       if (!waitCondition.countDownLatch.await(awaitTime, 
TimeUnit.MILLISECONDS)) {
@@ -158,22 +162,12 @@ public class LatchableEmitter extends StubServiceEmitter
     );
   }
 
-  private void triggerConditionEvaluations()
-  {
-    if (conditionEvaluateExecutor == null) {
-      throw new ISE("Cannot evaluate conditions as the 
'conditionEvaluateExecutor' is null.");
-    } else {
-      conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
-    }
-  }
-
   /**
-   * Evaluates wait conditions. This method must be invoked on the
-   * {@link #conditionEvaluateExecutor} so that it does not block {@link 
#emit(Event)}.
+   * Evaluates wait conditions for the given event.
    */
-  private void evaluateWaitConditions()
+  private void evaluateWaitConditions(Event event)
   {
-    eventReadWriteLock.readLock().lock();
+    eventProcessingLock.lock();
     try {
       // Create a copy of the conditions for thread-safety
       final List<WaitCondition> conditionsToEvaluate = 
List.copyOf(waitConditions);
@@ -181,25 +175,46 @@ public class LatchableEmitter extends StubServiceEmitter
         return;
       }
 
-      List<Event> events = getEvents();
       for (WaitCondition condition : conditionsToEvaluate) {
-        final int currentNumberOfEvents = events.size();
-
-        // Do not use an iterator over the list to avoid concurrent 
modification exceptions
-        // Evaluate new events against this condition
-        for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) 
{
-          if (condition.predicate.test(events.get(i))) {
-            condition.countDownLatch.countDown();
-          }
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
+        }
+      }
+    }
+    catch (Exception e) {
+      log.error(e, "Error while evaluating wait conditions for event[%s]", 
event.toMap());
+      throw new ISE(e, "Error while evaluating wait conditions for event[%s]", 
event.toMap());
+    }
+    finally {
+      processedEvents.add(event);
+      eventProcessingLock.unlock();
+    }
+  }
+
+  /**
+   * Evaluates the given new condition for all past events and then adds it to
+   * {@link #waitConditions}.
+   */
+  private void registerWaitCondition(WaitCondition condition)
+  {
+    eventProcessingLock.lock();
+    try {
+      for (Event event : processedEvents) {
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
+          break;
         }
-        condition.processedUntil = currentNumberOfEvents;
+      }
+
+      if (condition.countDownLatch.getCount() > 0) {
+        waitConditions.add(condition);
       }
     }
     catch (Exception e) {
-      log.error(e, "Error while evaluating wait conditions");
+      throw new ISE(e, "Error while evaluating condition");
     }
     finally {
-      eventReadWriteLock.readLock().unlock();
+      eventProcessingLock.unlock();
     }
   }
 
@@ -208,8 +223,6 @@ public class LatchableEmitter extends StubServiceEmitter
     private final Predicate<Event> predicate;
     private final CountDownLatch countDownLatch;
 
-    private int processedUntil;
-
     private WaitCondition(Predicate<Event> predicate)
     {
       this.predicate = predicate;
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
index 58532d290d2..6f6377740a7 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
@@ -31,10 +31,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
 
 public class QueryCountStatsMonitorTest
 {
@@ -111,18 +109,12 @@ public class QueryCountStatsMonitorTest
     emitter.flush();
     // Trigger metric emission
     monitor.doMonitor(emitter);
-    Map<String, Long> resultMap = emitter.getEvents()
-                                         .stream()
-                                         .collect(Collectors.toMap(
-                                             event -> (String) 
event.toMap().get("metric"),
-                                             event -> (Long) 
event.toMap().get("value")
-                                         ));
-    Assert.assertEquals(5, resultMap.size());
-    Assert.assertEquals(1L, (long) resultMap.get("query/success/count"));
-    Assert.assertEquals(2L, (long) resultMap.get("query/failed/count"));
-    Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count"));
-    Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count"));
-    Assert.assertEquals(10L, (long) resultMap.get("query/count"));
+    Assert.assertEquals(5, emitter.getNumEmittedEvents());
+    emitter.verifyValue("query/success/count", 1L);
+    emitter.verifyValue("query/failed/count", 2L);
+    emitter.verifyValue("query/interrupted/count", 3L);
+    emitter.verifyValue("query/timeout/count", 4L);
+    emitter.verifyValue("query/count", 10L);
   }
 
   @Test
@@ -137,18 +129,13 @@ public class QueryCountStatsMonitorTest
     emitter.flush();
     // Trigger metric emission
     monitor.doMonitor(emitter);
-    Map<String, Long> resultMap = emitter.getEvents()
-                                         .stream()
-                                         .collect(Collectors.toMap(
-                                             event -> (String) 
event.toMap().get("metric"),
-                                             event -> (Long) 
event.toMap().get("value")
-                                         ));
-    Assert.assertEquals(6, resultMap.size());
-    Assert.assertEquals(0, (long) 
resultMap.get("mergeBuffer/pendingRequests"));
-    Assert.assertEquals(1L, (long) resultMap.get("query/success/count"));
-    Assert.assertEquals(2L, (long) resultMap.get("query/failed/count"));
-    Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count"));
-    Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count"));
-    Assert.assertEquals(10L, (long) resultMap.get("query/count"));
+
+    Assert.assertEquals(6, emitter.getNumEmittedEvents());
+    emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
+    emitter.verifyValue("query/success/count", 1L);
+    emitter.verifyValue("query/failed/count", 2L);
+    emitter.verifyValue("query/interrupted/count", 3L);
+    emitter.verifyValue("query/timeout/count", 4L);
+    emitter.verifyValue("query/count", 10L);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
index 88acb6dca26..748672a12b4 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
@@ -32,13 +32,15 @@ public class ServiceStatusMonitorTest
 {
 
   private ServiceStatusMonitor monitor;
+  private StubServiceEmitter emitter;
   private Map<String, Object> heartbeatTags;
-  private Supplier<Map<String, Object>> heartbeatTagsSupplier = () -> 
heartbeatTags;
-  private static String HEARTBEAT_METRIC_KEY = "service/heartbeat";
+  private final Supplier<Map<String, Object>> heartbeatTagsSupplier = () -> 
heartbeatTags;
+  private static final String HEARTBEAT_METRIC_KEY = "service/heartbeat";
 
   @Before
   public void setUp()
   {
+    emitter = new StubServiceEmitter();
     monitor = new ServiceStatusMonitor();
     heartbeatTags = new HashMap<>();
     monitor.heartbeatTagsSupplier = heartbeatTagsSupplier;
@@ -47,23 +49,18 @@ public class ServiceStatusMonitorTest
   @Test
   public void testDefaultHeartbeatReported()
   {
-    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
-    Assert.assertEquals(HEARTBEAT_METRIC_KEY, 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(HEARTBEAT_METRIC_KEY, 1);
   }
 
   @Test
   public void testLeaderTag()
   {
     heartbeatTags.put("leader", 1);
-    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
-    Assert.assertEquals(HEARTBEAT_METRIC_KEY, 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1), 1);
   }
 
   @Test
@@ -71,12 +68,8 @@ public class ServiceStatusMonitorTest
   {
     heartbeatTags.put("leader", 1);
     heartbeatTags.put("taskRunner", "http");
-    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
-    Assert.assertEquals("http", 
emitter.getEvents().get(0).toMap().get("taskRunner"));
-    Assert.assertEquals(HEARTBEAT_METRIC_KEY, 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1, 
"taskRunner", "http"), 1);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
index 93687bc1be8..2489d033411 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -94,7 +94,7 @@ public class TaskCountStatsMonitorTest
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
 
-    Assert.assertEquals(9, emitter.getEvents().size());
+    Assert.assertEquals(9, emitter.getNumEmittedEvents());
 
     emitter.verifyValue("task/success/count", Map.of("dataSource", "d1", 
"taskType", "index"), 1L);
     emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1", 
"taskType", "index"), 1L);
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
index 0fed1c9b6bc..092a7b66d01 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
@@ -74,7 +74,7 @@ public class TaskSlotCountStatsMonitorTest
     final TaskSlotCountStatsMonitor monitor = new 
TaskSlotCountStatsMonitor(statsProvider);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals(5, emitter.getNumEmittedEvents());
     emitter.verifyValue("taskSlot/total/count", 1L);
     emitter.verifyValue("taskSlot/idle/count", 1L);
     emitter.verifyValue("taskSlot/used/count", 1L);
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
index a1930bef637..05e10ab0142 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
@@ -222,7 +222,7 @@ public class WorkerTaskCountStatsMonitorTest
         new WorkerTaskCountStatsMonitor(injectorForMiddleManager, 
ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals(5, emitter.getNumEmittedEvents());
     emitter.verifyValue(
         "worker/task/failed/count",
         ImmutableMap.of("category", "workerCategory", "workerVersion", 
"workerVersion"),
@@ -257,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest
         new WorkerTaskCountStatsMonitor(injectorForIndexer, 
ImmutableSet.of(NodeRole.INDEXER));
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(10, emitter.getEvents().size());
+    Assert.assertEquals(10, emitter.getNumEmittedEvents());
     emitter.verifyValue(
         "worker/task/running/count",
         ImmutableMap.of("dataSource", "wikipedia"),
@@ -317,7 +317,7 @@ public class WorkerTaskCountStatsMonitorTest
         new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, 
ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
   }
 
   @Test
@@ -327,6 +327,6 @@ public class WorkerTaskCountStatsMonitorTest
             new WorkerTaskCountStatsMonitor(injectorForPeon, 
ImmutableSet.of(NodeRole.PEON));
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(0, emitter.getEvents().size());
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
   }
 }
diff --git 
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 6bcf87fdc4a..f974d361b1f 100644
--- 
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -683,7 +683,8 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     }
     catch (NullPointerException ignored) {
     }
-    Assert.assertEquals("query/time", 
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+    // Assert.assertEquals("query/time", 
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+    stubServiceEmitter.verifyEmitted("query/time", 1);
     if (!isJDBCSql) {
       Assert.assertEquals("dummy", 
stubServiceEmitter.getEvents().get(0).toMap().get("id"));
     }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
index 1333164449f..71c228a96e5 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
@@ -26,7 +26,6 @@ import com.google.inject.name.Names;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.LatchableEmitter;
@@ -48,10 +47,9 @@ public class LatchableEmitterModule implements DruidModule
   @Provides
   @ManageLifecycle
   public LatchableEmitter makeEmitter(
-      @Self DruidNode selfNode,
-      ScheduledExecutorFactory executorFactory
+      @Self DruidNode selfNode
   )
   {
-    return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), 
executorFactory);
+    return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to