This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7aec41b8feb Minor clean up of LatchableEmitter / StubServiceEmitter
(#18255)
7aec41b8feb is described below
commit 7aec41b8feb51efc8ce286b6b136b12c80537a22
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Jul 16 19:43:12 2025 +0530
Minor clean up of LatchableEmitter / StubServiceEmitter (#18255)
Follow up to #18249
Changes:
- Maintain a List of processed events in `LatchableEmitter`.
This is an improvement over the current flow where a copy of events is
created upon receiving every new event.
- When a new condition is registered, evaluate all past events upfront,
then add it to the set of wait conditions
- Evaluate each new event as it is received
Other changes:
- Hide the internal queue implementation of `StubServiceEmitter` from tests
and sub-classes
- Reduce the usage of `StubServiceEmitter.getEvents()`. Use the inbuilt
`verifyValue` methods instead.
---
.../overlord/common/KubernetesPeonClientTest.java | 2 +-
.../overlord/duty/UnusedSegmentsKillerTest.java | 5 +-
.../SeekableStreamSupervisorSpecTest.java | 9 +-
.../SeekableStreamSupervisorStateTest.java | 2 +-
.../worker/shuffle/ShuffleMonitorTest.java | 29 ++----
.../util/emitter/service/ServiceMetricEvent.java | 8 ++
.../emitter/service/ServiceMetricEventTest.java | 29 +++++-
.../java/util/metrics/CgroupCpuSetMonitorTest.java | 23 ++---
.../java/util/metrics/CgroupDiskMonitorTest.java | 4 +-
.../java/util/metrics/CgroupMemoryMonitorTest.java | 5 +-
.../java/util/metrics/CgroupV2CpuMonitorTest.java | 5 +-
.../java/util/metrics/CgroupV2DiskMonitorTest.java | 4 +-
.../java/util/metrics/CpuAcctDeltaMonitorTest.java | 4 +-
.../util/metrics/HttpPostEmitterMonitorTest.java | 39 +++-----
.../java/util/metrics/OshiSysMonitorTest.java | 26 +++---
.../java/util/metrics/StubServiceEmitter.java | 52 +++--------
.../druid/query/CPUTimeMetricQueryRunnerTest.java | 2 +-
.../druid/query/DefaultQueryMetricsTest.java | 2 +-
.../druid/client/cache/MemcachedCacheTest.java | 12 +--
.../curator/DruidConnectionStateListenerTest.java | 12 +--
.../appenderator/StreamAppenderatorTest.java | 17 ++--
.../druid/server/ClientQuerySegmentWalkerTest.java | 35 ++++---
.../druid/server/audit/SQLAuditManagerTest.java | 17 +---
.../lookup/cache/LookupCoordinatorManagerTest.java | 2 +-
.../server/metrics/GroupByStatsMonitorTest.java | 25 ++---
.../metrics/HistoricalMetricsMonitorTest.java | 73 +++++----------
.../druid/server/metrics/LatchableEmitter.java | 101 ++++++++++++---------
.../server/metrics/QueryCountStatsMonitorTest.java | 41 +++------
.../server/metrics/ServiceStatusMonitorTest.java | 27 ++----
.../server/metrics/TaskCountStatsMonitorTest.java | 2 +-
.../metrics/TaskSlotCountStatsMonitorTest.java | 2 +-
.../metrics/WorkerTaskCountStatsMonitorTest.java | 8 +-
.../server/AsyncQueryForwardingServletTest.java | 3 +-
.../embedded/emitter/LatchableEmitterModule.java | 6 +-
34 files changed, 270 insertions(+), 363 deletions(-)
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index 4d4e1cd9a1b..9c831217914 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -93,7 +93,7 @@ public class KubernetesPeonClientTest
Pod peonPod = instance.launchPeonJobAndWaitForStart(job,
NoopTask.create(), 1, TimeUnit.SECONDS);
Assertions.assertNotNull(peonPod);
- Assertions.assertEquals(1, serviceEmitter.getEvents().size());
+ Assertions.assertEquals(1, serviceEmitter.getNumEmittedEvents());
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
index 111489a74dc..0cc5f6f5f23 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.UnusedSegmentKillerConfig;
@@ -52,7 +53,6 @@ import org.junit.Rule;
import org.junit.Test;
import java.util.List;
-import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
@@ -329,8 +329,7 @@ public class UnusedSegmentsKillerTest
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
// Verify that the kill intervals are sorted with the oldest interval first
- final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
- emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(TaskMetrics.RUN_DURATION);
final List<Interval> killIntervals = events.stream().map(event -> {
final String taskId = (String)
event.getUserDims().get(DruidMetrics.TASK_ID);
String[] splits = taskId.split("_");
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index cfd49994262..a663c7c19aa 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -780,8 +780,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Assert.assertEquals(2, taskCountAfterScaleOut);
Assert.assertTrue(
dynamicActionEmitter
- .getMetricEvents()
- .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
@@ -840,8 +839,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Assert.assertTrue(
dynamicActionEmitter
- .getMetricEvents()
- .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
@@ -1103,8 +1101,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Assert.assertTrue(
dynamicActionEmitter
- .getMetricEvents()
- .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 4ae5a44c1c8..feba656ee63 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2521,7 +2521,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
supervisor.emitLag();
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
}
private void validateSupervisorStateAfterResetOffsets(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
index 1174bc842ed..6ee98a8d309 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
@@ -21,14 +21,12 @@ package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableMap;
import
org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.List;
+import java.util.Map;
public class ShuffleMonitorTest
{
@@ -46,23 +44,16 @@ public class ShuffleMonitorTest
final ShuffleMonitor monitor = new ShuffleMonitor();
monitor.setShuffleMetrics(shuffleMetrics);
Assert.assertTrue(monitor.doMonitor(emitter));
- final List<Event> events = emitter.getEvents();
- Assert.assertEquals(2, events.size());
- Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
- ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
- Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
- Assert.assertEquals(310L, event.getValue());
- Assert.assertEquals(
- ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION,
"supervisor"),
- event.getUserDims()
+ Assert.assertEquals(2, emitter.getNumEmittedEvents());
+ emitter.verifyValue(
+ ShuffleMonitor.SHUFFLE_BYTES_KEY,
+ Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
+ 310L
);
- Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
- event = (ServiceMetricEvent) events.get(1);
- Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY,
event.getMetric());
- Assert.assertEquals(3, event.getValue());
- Assert.assertEquals(
- ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION,
"supervisor"),
- event.getUserDims()
+ emitter.verifyValue(
+ ShuffleMonitor.SHUFFLE_REQUESTS_KEY,
+ Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
+ 3
);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
index 3bab9a3ad04..f7f3549ecf7 100644
---
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
+++
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
@@ -125,6 +125,14 @@ public class ServiceMetricEvent implements Event
.build();
}
+ /**
+ * Creates an immutable copy of this metric event. This is used only in
tests.
+ */
+ public ServiceMetricEvent copy()
+ {
+ return new ServiceMetricEvent(createdTime, serviceDims,
Map.copyOf(userDims), feed, metric, value);
+ }
+
/**
* Builder for a {@link ServiceMetricEvent}. This builder can be used for
* building only one event.
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
index 5fbf4dc9c9e..cf59f0f3c40 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
@@ -35,7 +35,7 @@ import java.util.Map;
public class ServiceMetricEventTest
{
@Test
- public void testStupidTest()
+ public void testBuilder()
{
ServiceMetricEvent builderEvent = new ServiceMetricEvent.Builder()
.setDimension("user1", "a")
@@ -317,4 +317,31 @@ public class ServiceMetricEventTest
Assert.assertTrue(target.getUserDims().isEmpty());
Assert.assertNull(target.getUserDims().get("userDimMap"));
}
+
+ @Test
+ public void test_copy_returnsAnImmutableInstance()
+ {
+ final ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent
+ .builder()
+ .setDimension("dim1", "v1")
+ .setMetric("m1", 100);
+
+ final ServiceMetricEvent event1 = eventBuilder.build("coordinator",
"localhost");
+ final ServiceMetricEvent event1Copy = event1.copy();
+
+ Assert.assertEquals(Map.of("dim1", "v1"), event1.getUserDims());
+ Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
+
+ final ServiceMetricEvent event2 = eventBuilder
+ .setDimension("dim2", "v2")
+ .setMetric("m2", 200)
+ .build("coordinator", "localhost");
+
+ // Verify that the original event gets changed dimensions
+ Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"),
event2.getUserDims());
+ Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"),
event1.getUserDims());
+
+ // But the event copy still has the original dimensions
+ Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
index 027bd995b33..b18aa138fa5 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -34,8 +33,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
public class CgroupCpuSetMonitorTest
{
@@ -72,19 +69,11 @@ public class CgroupCpuSetMonitorTest
final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer,
ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
Assert.assertTrue(monitor.doMonitor(emitter));
- final List<Event> actualEvents = emitter.getEvents();
- Assert.assertEquals(4, actualEvents.size());
- final Map<String, Object> cpusEvent = actualEvents.get(0).toMap();
- final Map<String, Object> effectiveCpusEvent = actualEvents.get(1).toMap();
- final Map<String, Object> memsEvent = actualEvents.get(2).toMap();
- final Map<String, Object> effectiveMemsEvent = actualEvents.get(3).toMap();
- Assert.assertEquals("cgroup/cpuset/cpu_count", cpusEvent.get("metric"));
- Assert.assertEquals(8, cpusEvent.get("value"));
- Assert.assertEquals("cgroup/cpuset/effective_cpu_count",
effectiveCpusEvent.get("metric"));
- Assert.assertEquals(7, effectiveCpusEvent.get("value"));
- Assert.assertEquals("cgroup/cpuset/mems_count", memsEvent.get("metric"));
- Assert.assertEquals(4, memsEvent.get("value"));
- Assert.assertEquals("cgroup/cpuset/effective_mems_count",
effectiveMemsEvent.get("metric"));
- Assert.assertEquals(1, effectiveMemsEvent.get("value"));
+ Assert.assertEquals(4, emitter.getNumEmittedEvents());
+
+ emitter.verifyValue("cgroup/cpuset/cpu_count", 8);
+ emitter.verifyValue("cgroup/cpuset/effective_cpu_count", 7);
+ emitter.verifyValue("cgroup/cpuset/mems_count", 4);
+ emitter.verifyValue("cgroup/cpuset/effective_mems_count", 1);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
index 21a83458729..d75af320f12 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
@@ -68,13 +68,13 @@ public class CgroupDiskMonitorTest
final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer,
ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
Assert.assertTrue(monitor.doMonitor(emitter));
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2",
serviceBytesFile);
TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2",
servicedFile);
Assert.assertTrue(monitor.doMonitor(emitter));
- Assert.assertEquals(8, emitter.getEvents().size());
+ Assert.assertEquals(8, emitter.getNumEmittedEvents());
Assert.assertTrue(
emitter
.getEvents()
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
index 7827368ec9f..6538cb9691a 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -34,7 +33,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
-import java.util.List;
public class CgroupMemoryMonitorTest
{
@@ -71,7 +69,6 @@ public class CgroupMemoryMonitorTest
final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer,
ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
Assert.assertTrue(monitor.doMonitor(emitter));
- final List<Event> actualEvents = emitter.getEvents();
- Assert.assertEquals(46, actualEvents.size());
+ Assert.assertEquals(46, emitter.getNumEmittedEvents());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
index 7006de1edb0..07682643148 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableSet;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -33,7 +32,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
-import java.util.List;
import java.util.stream.Collectors;
public class CgroupV2CpuMonitorTest
@@ -65,8 +63,7 @@ public class CgroupV2CpuMonitorTest
final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
Assert.assertTrue(monitor.doMonitor(emitter));
- final List<Event> actualEvents = emitter.getEvents();
- Assert.assertEquals(0, actualEvents.size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
emitter.flush();
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
index 40201a30feb..f9a977edd92 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
@@ -61,14 +61,14 @@ public class CgroupV2DiskMonitorTest
final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
Assert.assertTrue(monitor.doMonitor(emitter));
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
emitter.flush();
TestUtils.copyOrReplaceResource("/cgroupv2/io.stat-2", statFile);
Assert.assertTrue(monitor.doMonitor(emitter));
- Assert.assertEquals(4, emitter.getEvents().size());
+ Assert.assertEquals(4, emitter.getNumEmittedEvents());
Assert.assertTrue(
emitter
.getEvents()
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
index 97dd707299e..94cb1d9b3aa 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
@@ -94,10 +94,10 @@ public class CpuAcctDeltaMonitorTest
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
Assert.assertFalse(monitor.doMonitor(emitter));
// First should just cache
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
Assert.assertTrue(cpuacct.delete());
TestUtils.copyResource("/cpuacct.usage_all", cpuacct);
Assert.assertTrue(monitor.doMonitor(emitter));
- Assert.assertEquals(2 * 128 + 1, emitter.getEvents().size());
+ Assert.assertEquals(2 * 128 + 1, emitter.getNumEmittedEvents());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
index 2be52c091e7..819c9ba50d7 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
@@ -25,10 +25,6 @@ import
org.apache.druid.java.util.emitter.core.HttpPostEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Map;
-import java.util.Queue;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -65,26 +61,19 @@ public class HttpPostEmitterMonitorTest
assertTrue(monitor.doMonitor(stubServiceEmitter));
- final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = stubServiceEmitter.getMetricEvents();
-
- assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
- assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
- assertMetricValue(metricEvents, "emitter/successfulSending/minTimeMs", 0);
- assertMetricValue(metricEvents, "emitter/buffers/emitQueue", 30);
- assertMetricValue(metricEvents, "emitter/failedSending/minTimeMs", 0);
- assertMetricValue(metricEvents, "emitter/buffers/allocated/delta", 20);
- assertMetricValue(metricEvents, "emitter/batchFilling/maxTimeMs", 0);
- assertMetricValue(metricEvents, "emitter/buffers/dropped/delta", 10);
- assertMetricValue(metricEvents, "emitter/batchFilling/minTimeMs", 0);
- assertMetricValue(metricEvents, "emitter/events/emitQueue", 200L);
- assertMetricValue(metricEvents, "emitter/events/large/emitQueue", 75L);
- assertMetricValue(metricEvents, "emitter/buffers/reuseQueue", 15);
- assertMetricValue(metricEvents, "emitter/buffers/failed/delta", 5);
- assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
- }
-
- private void assertMetricValue(Map<String,
Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String
metricName, Number expectedValue)
- {
-
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(),
expectedValue.doubleValue());
+ stubServiceEmitter.verifyValue("emitter/successfulSending/maxTimeMs", 0);
+ stubServiceEmitter.verifyValue("emitter/events/emitted/delta", 100L);
+ stubServiceEmitter.verifyValue("emitter/successfulSending/minTimeMs", 0);
+ stubServiceEmitter.verifyValue("emitter/buffers/emitQueue", 30);
+ stubServiceEmitter.verifyValue("emitter/failedSending/minTimeMs", 0);
+ stubServiceEmitter.verifyValue("emitter/buffers/allocated/delta", 20);
+ stubServiceEmitter.verifyValue("emitter/batchFilling/maxTimeMs", 0);
+ stubServiceEmitter.verifyValue("emitter/buffers/dropped/delta", 10);
+ stubServiceEmitter.verifyValue("emitter/batchFilling/minTimeMs", 0);
+ stubServiceEmitter.verifyValue("emitter/events/emitQueue", 200L);
+ stubServiceEmitter.verifyValue("emitter/events/large/emitQueue", 75L);
+ stubServiceEmitter.verifyValue("emitter/buffers/reuseQueue", 15);
+ stubServiceEmitter.verifyValue("emitter/buffers/failed/delta", 5);
+ stubServiceEmitter.verifyValue("emitter/failedSending/maxTimeMs", 0);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
index ece54cd3fa6..c31ee8ea776 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
@@ -107,7 +107,7 @@ public class OshiSysMonitorTest
m.start();
m.monitorMemStats(emitter);
m.stop();
- Assert.assertEquals(3, emitter.getEvents().size());
+ Assert.assertEquals(3, emitter.getNumEmittedEvents());
emitter.verifyEmitted("sys/mem/max", 1);
emitter.verifyEmitted("sys/mem/used", 1);
emitter.verifyEmitted("sys/mem/free", 1);
@@ -129,7 +129,7 @@ public class OshiSysMonitorTest
m.start();
m.doMonitor(emitter);
m.stop();
- Assert.assertEquals(3, emitter.getEvents().size());
+ Assert.assertEquals(3, emitter.getNumEmittedEvents());
emitter.verifyEmitted("sys/mem/max", 1);
emitter.verifyEmitted("sys/mem/used", 1);
emitter.verifyEmitted("sys/mem/free", 1);
@@ -153,7 +153,7 @@ public class OshiSysMonitorTest
OshiSysMonitor m = createMonitor(si);
m.start();
m.monitorSwapStats(emitter);
- Assert.assertEquals(4, emitter.getEvents().size());
+ Assert.assertEquals(4, emitter.getNumEmittedEvents());
emitter.verifyEmitted("sys/swap/pageIn", 1);
emitter.verifyEmitted("sys/swap/pageOut", 1);
emitter.verifyEmitted("sys/swap/max", 1);
@@ -201,7 +201,7 @@ public class OshiSysMonitorTest
OshiSysMonitor m = createMonitor(si);
m.start();
m.monitorFsStats(emitter);
- Assert.assertEquals(8, emitter.getEvents().size());
+ Assert.assertEquals(8, emitter.getNumEmittedEvents());
emitter.verifyEmitted("sys/fs/max", 2);
emitter.verifyEmitted("sys/fs/used", 2);
emitter.verifyEmitted("sys/fs/files/count", 2);
@@ -272,7 +272,7 @@ public class OshiSysMonitorTest
OshiSysMonitor m = createMonitor(si);
m.start();
m.monitorDiskStats(emitter);
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
Mockito.when(disk1.getReadBytes()).thenReturn(400L);
Mockito.when(disk1.getReads()).thenReturn(220L);
@@ -288,7 +288,7 @@ public class OshiSysMonitorTest
Mockito.when(disk2.getTransferTime()).thenReturn(1100L);
m.monitorDiskStats(emitter);
- Assert.assertEquals(12, emitter.getEvents().size());
+ Assert.assertEquals(12, emitter.getNumEmittedEvents());
Map<String, Object> userDims1 = ImmutableMap.of(
"diskName",
@@ -362,7 +362,7 @@ public class OshiSysMonitorTest
OshiSysMonitor m = createMonitor(si);
m.start();
m.monitorNetStats(emitter);
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
Mockito.when(net1.getBytesRecv()).thenReturn(400L);
Mockito.when(net1.getPacketsRecv()).thenReturn(220L);
@@ -375,7 +375,7 @@ public class OshiSysMonitorTest
m.monitorNetStats(emitter);
- Assert.assertEquals(16, emitter.getEvents().size()); // 8 * 2 whitelisted
ips
+ Assert.assertEquals(16, emitter.getNumEmittedEvents()); // 8 * 2
whitelisted ips
Map<String, Object> userDims1 = ImmutableMap.of(
"netName",
@@ -460,7 +460,7 @@ public class OshiSysMonitorTest
OshiSysMonitor m = createMonitor(si);
m.start();
m.monitorCpuStats(emitter);
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
long[][] procTicks2 = new long[][]{
{4L, 5L, 6L, 8L, 9L, 7L, 10L, 12L}, // Δtick1 {3,3,3,4,4,1,3,4}
_total = 25, emitted percentage
@@ -470,7 +470,7 @@ public class OshiSysMonitorTest
m.monitorCpuStats(emitter);
m.stop();
- Assert.assertEquals(16, emitter.getEvents().size()); // 8 ticktype * 2
processors
+ Assert.assertEquals(16, emitter.getNumEmittedEvents()); // 8 ticktype * 2
processors
Map<String, Object> userDims = new HashMap<>();
userDims.put("cpuName", "0");
@@ -557,7 +557,7 @@ public class OshiSysMonitorTest
OshiSysMonitor m = createMonitor(si);
m.start();
m.monitorSysStats(emitter);
- Assert.assertEquals(4, emitter.getEvents().size());
+ Assert.assertEquals(4, emitter.getNumEmittedEvents());
m.stop();
emitter.verifyEmitted("sys/uptime", 1);
emitter.verifyEmitted("sys/la/1", 1);
@@ -592,7 +592,7 @@ public class OshiSysMonitorTest
m.start();
m.monitorTcpStats(emitter);
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
Mockito.when(tcpv4.getConnectionsActive()).thenReturn(20L);
Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(25L);
Mockito.when(tcpv4.getConnectionFailures()).thenReturn(8L);
@@ -604,7 +604,7 @@ public class OshiSysMonitorTest
Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L);
m.monitorTcpStats(emitter);
m.stop();
- Assert.assertEquals(9, emitter.getEvents().size());
+ Assert.assertEquals(9, emitter.getNumEmittedEvents());
emitter.verifyValue("sys/tcpv4/activeOpens", 10L);
emitter.verifyValue("sys/tcpv4/passiveOpens", 5L);
emitter.verifyValue("sys/tcpv4/attemptFails", 3L);
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 55113b97ac2..b330573c362 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -41,7 +41,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
{
private final Queue<Event> events = new ConcurrentLinkedDeque<>();
private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
- private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>>
metricEvents = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Queue<ServiceMetricEvent>>
metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{
@@ -59,7 +59,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
if (event instanceof ServiceMetricEvent) {
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new
ConcurrentLinkedDeque<>())
- .add(new ServiceMetricEventSnapshot(metricEvent));
+ .add(metricEvent.copy());
} else if (event instanceof AlertEvent) {
alertEvents.add((AlertEvent) event);
}
@@ -74,14 +74,20 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
return new ArrayList<>(events);
}
+ public int getNumEmittedEvents()
+ {
+ return events.size();
+ }
+
/**
- * Gets all the metric events emitted since the previous {@link #flush()}.
+ * Gets all the metric events emitted for the given metric name since the
previous {@link #flush()}.
*
- * @return Map from metric name to list of events emitted for that metric.
+ * @return List of events emitted for the given metric.
*/
- public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
+ public List<ServiceMetricEvent> getMetricEvents(String metricName)
{
- return metricEvents;
+ final Queue<ServiceMetricEvent> metricEventQueue =
metricEvents.get(metricName);
+ return metricEventQueue == null ? List.of() :
List.copyOf(metricEventQueue);
}
/**
@@ -99,18 +105,18 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
)
{
final List<Number> values = new ArrayList<>();
- final Queue<ServiceMetricEventSnapshot> events =
+ final Queue<ServiceMetricEvent> events =
metricEvents.getOrDefault(metricName, new ArrayDeque<>());
final Map<String, Object> filters =
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
- for (ServiceMetricEventSnapshot event : events) {
+ for (ServiceMetricEvent event : events) {
final Map<String, Object> userDims = event.getUserDims();
boolean match = filters.keySet().stream()
.map(d -> filters.get(d).equals(userDims.get(d)))
.reduce((a, b) -> a && b)
.orElse(true);
if (match) {
- values.add(event.getMetricEvent().getValue());
+ values.add(event.getValue());
}
}
@@ -134,32 +140,4 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
public void close()
{
}
-
- /**
- * Helper class to encapsulate a ServiceMetricEvent and its user dimensions.
- * Since {@link StubServiceEmitter} doesn't actually emit metrics and saves
the emitted metrics in-memory,
- * this helper class saves a copy of {@link ServiceMetricEvent#userDims} of
emitted metrics
- * via {@link ServiceMetricEvent#getUserDims()} as it can get mutated.
- */
- public static class ServiceMetricEventSnapshot
- {
- private final ServiceMetricEvent metricEvent;
- private final Map<String, Object> userDims;
-
- public ServiceMetricEventSnapshot(ServiceMetricEvent metricEvent)
- {
- this.metricEvent = metricEvent;
- this.userDims = metricEvent.getUserDims();
- }
-
- public ServiceMetricEvent getMetricEvent()
- {
- return metricEvent;
- }
-
- public Map<String, Object> getUserDims()
- {
- return userDims;
- }
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
index 4647704fe27..32d9852d0c1 100644
---
a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
@@ -69,7 +69,7 @@ public class CPUTimeMetricQueryRunnerTest
);
Assert.assertEquals(expectedResults, results.toList());
- Assert.assertEquals(1, emitter.getEvents().size());
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
final Event event = Iterables.getOnlyElement(emitter.getEvents());
diff --git
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index ce7481c4e4a..f7cd6887e7e 100644
---
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -138,7 +138,7 @@ public class DefaultQueryMetricsTest extends
InitializedNullHandlingTest
// Verify that Queried Segment Count does not get emitted by the
DefaultQueryMetrics
// and the total number of emitted metrics remains unchanged
queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter);
- Assert.assertEquals(10, serviceEmitter.getEvents().size());
+ Assert.assertEquals(10, serviceEmitter.getNumEmittedEvents());
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
index f87c7b679cc..e8209a444cb 100644
--- a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
+++ b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.client.cache;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -50,11 +49,9 @@ import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.Initialization;
-import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
@@ -206,16 +203,13 @@ public class MemcachedCacheTest extends
CacheTestBase<MemcachedCache>
final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig);
final StubServiceEmitter serviceEmitter = new
StubServiceEmitter("service", "host");
- while (serviceEmitter.getEvents().isEmpty()) {
+ while (serviceEmitter.getNumEmittedEvents() <= 0) {
Thread.sleep(memcachedCacheConfig.getTimeout());
cache.doMonitor(serviceEmitter);
}
- Assert.assertFalse(serviceEmitter.getEvents().isEmpty());
- ObjectMapper mapper = new DefaultObjectMapper();
- for (Event event : serviceEmitter.getEvents()) {
- log.debug("Found event `%s`", mapper.writeValueAsString(event.toMap()));
- }
+ Assert.assertTrue(serviceEmitter.getNumEmittedEvents() > 0);
+
Assert.assertFalse(serviceEmitter.getMetricEvents("query/cache/memcached/total").isEmpty());
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
index 86ff59c25ae..4a9b7a4e36f 100644
---
a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
+++
b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
@@ -60,7 +60,7 @@ public class DruidConnectionStateListenerTest
public void test_doMonitor_init()
{
listener.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
emitter.verifyValue("zk/connected", 0);
}
@@ -69,7 +69,7 @@ public class DruidConnectionStateListenerTest
{
listener.stateChanged(null, ConnectionState.CONNECTED);
listener.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
emitter.verifyValue("zk/connected", 1);
}
@@ -79,7 +79,7 @@ public class DruidConnectionStateListenerTest
{
listener.stateChanged(null, ConnectionState.SUSPENDED);
listener.doMonitor(emitter);
- Assert.assertEquals(2, emitter.getEvents().size()); // 2 because
stateChanged emitted an alert
+ Assert.assertEquals(2, emitter.getNumEmittedEvents()); // 2 because
stateChanged emitted an alert
emitter.verifyValue("zk/connected", 0);
}
@@ -88,7 +88,7 @@ public class DruidConnectionStateListenerTest
public void test_suspendedAlert()
{
listener.stateChanged(null, ConnectionState.SUSPENDED);
- Assert.assertEquals(1, emitter.getEvents().size());
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
final AlertEvent alert = emitter.getAlerts().get(0);
Assert.assertEquals("alerts", alert.getFeed());
@@ -99,10 +99,10 @@ public class DruidConnectionStateListenerTest
public void test_reconnectedMetric()
{
listener.stateChanged(null, ConnectionState.SUSPENDED);
- Assert.assertEquals(1, emitter.getEvents().size()); // the first
stateChanged emits an alert
+ Assert.assertEquals(1, emitter.getNumEmittedEvents()); // the first
stateChanged emits an alert
listener.stateChanged(null, ConnectionState.RECONNECTED);
- Assert.assertEquals(2, emitter.getEvents().size()); // the second
stateChanged emits a metric
+ Assert.assertEquals(2, emitter.getNumEmittedEvents()); // the second
stateChanged emits a metric
long observedReconnectTime = emitter.getValue("zk/reconnect/time",
null).longValue();
Assert.assertTrue(observedReconnectTime >= 0);
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 463c4ed8c53..f7e2bb0b57e 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -78,7 +78,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -2279,17 +2278,15 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
private void verifySinkMetrics(StubServiceEmitter emitter, Set<String>
segmentIds)
{
- Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events =
emitter.getMetricEvents();
int segments = segmentIds.size();
- Assert.assertEquals(4, events.size());
- Assert.assertTrue(events.containsKey("query/cpu/time"));
- Assert.assertEquals(segments, events.get("query/segment/time").size());
- Assert.assertEquals(segments,
events.get("query/segmentAndCache/time").size());
- Assert.assertEquals(segments, events.get("query/wait/time").size());
+ emitter.verifyEmitted("query/cpu/time", 1);
+ Assert.assertEquals(segments,
emitter.getMetricEvents("query/segment/time").size());
+ Assert.assertEquals(segments,
emitter.getMetricEvents("query/segmentAndCache/time").size());
+ Assert.assertEquals(segments,
emitter.getMetricEvents("query/wait/time").size());
for (String id : segmentIds) {
-
Assert.assertTrue(events.get("query/segment/time").stream().anyMatch(value ->
value.getUserDims().containsValue(id)));
-
Assert.assertTrue(events.get("query/segmentAndCache/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
- Assert.assertTrue(events.get("query/wait/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
+
Assert.assertTrue(emitter.getMetricEvents("query/segment/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
+
Assert.assertTrue(emitter.getMetricEvents("query/segmentAndCache/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
+
Assert.assertTrue(emitter.getMetricEvents("query/wait/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index 484ae809fc0..db1b97bce63 100644
---
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -31,8 +31,8 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
@@ -987,9 +987,7 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
- List<Event> events = emitter.getEvents();
-
- for (Event event : events) {
+ for (ServiceMetricEvent event :
emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC)) {
EventMap map = event.toMap();
if
(ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
@@ -1038,23 +1036,22 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
- List<Event> events = emitter.getEvents();
+ for (ServiceMetricEvent event :
emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC)) {
+ EventMap map = event.toMap();
+ Assert.assertTrue(map.containsKey("host"));
+ Assert.assertTrue(map.containsKey("service"));
+ Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
+ Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
+ Assert.assertEquals(3, map.get("value"));
+ }
- for (Event event : events) {
+ for (ServiceMetricEvent event :
emitter.getMetricEvents(ClientQuerySegmentWalker.BYTES_COUNT_METRIC)) {
EventMap map = event.toMap();
- if
(ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
- Assert.assertTrue(map.containsKey("host"));
- Assert.assertTrue(map.containsKey("service"));
- Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
- Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
- Assert.assertEquals(3, map.get("value"));
- } else if
(ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) {
- Assert.assertTrue(map.containsKey("host"));
- Assert.assertTrue(map.containsKey("service"));
- Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
- Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
- Assert.assertEquals(43L, map.get("value"));
- }
+ Assert.assertTrue(map.containsKey("host"));
+ Assert.assertTrue(map.containsKey("service"));
+ Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
+ Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
+ Assert.assertEquals(43L, map.get("value"));
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index 3505eb943a2..650bf7d02cf 100644
---
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -43,7 +43,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.TreeMap;
@RunWith(MockitoJUnitRunner.class)
@@ -92,14 +91,10 @@ public class SQLAuditManagerTest
final AuditEntry entry = createAuditEntry("testKey", "testType",
DateTimes.nowUtc());
auditManager.doAudit(entry);
- Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = serviceEmitter.getMetricEvents();
- Assert.assertEquals(1, metricEvents.size());
-
- Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents =
metricEvents.get("config/audit");
- Assert.assertNotNull(auditMetricEvents);
+ List<ServiceMetricEvent> auditMetricEvents =
serviceEmitter.getMetricEvents("config/audit");
Assert.assertEquals(1, auditMetricEvents.size());
- ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
+ ServiceMetricEvent metric = auditMetricEvents.get(0);
final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
Assert.assertNotNull(dbEntry);
@@ -121,14 +116,12 @@ public class SQLAuditManagerTest
Assert.assertEquals(entry, dbEntry);
// Verify emitted metrics
- Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = serviceEmitter.getMetricEvents();
- Assert.assertEquals(1, metricEvents.size());
-
- Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents =
metricEvents.get("config/audit");
+ List<ServiceMetricEvent> auditMetricEvents
+ = serviceEmitter.getMetricEvents("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());
- ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
+ ServiceMetricEvent metric = auditMetricEvents.get(0);
Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
Assert.assertNull(metric.getUserDims().get("payload"));
diff --git
a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
index c94d8e43668..735d19bd95f 100644
---
a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
+++
b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java
@@ -142,7 +142,7 @@ public class LookupCoordinatorManagerTest
@After
public void tearDown()
{
- Assert.assertEquals(0, SERVICE_EMITTER.getEvents().size());
+ Assert.assertEquals(0, SERVICE_EMITTER.getNumEmittedEvents());
SERVICE_EMITTER.flush();
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index f16acd0b060..aa5955cd914 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -31,13 +31,11 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
public class GroupByStatsMonitorTest
{
@@ -83,20 +81,15 @@ public class GroupByStatsMonitorTest
emitter.flush();
// Trigger metric emission
monitor.doMonitor(emitter);
- Map<String, Long> resultMap = emitter.getEvents()
- .stream()
- .collect(Collectors.toMap(
- event -> (String)
event.toMap().get("metric"),
- event -> (Long)
event.toMap().get("value")
- ));
- Assert.assertEquals(7, resultMap.size());
- Assert.assertEquals(0, (long)
resultMap.get("mergeBuffer/pendingRequests"));
- Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/used"));
- Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/queries"));
- Assert.assertEquals(100, (long)
resultMap.get("mergeBuffer/acquisitionTimeNs"));
- Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries"));
- Assert.assertEquals(200, (long) resultMap.get("groupBy/spilledBytes"));
- Assert.assertEquals(300, (long)
resultMap.get("groupBy/mergeDictionarySize"));
+
+ Assert.assertEquals(7, emitter.getNumEmittedEvents());
+ emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
+ emitter.verifyValue("mergeBuffer/used", 0L);
+ emitter.verifyValue("mergeBuffer/queries", 1L);
+ emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L);
+ emitter.verifyValue("groupBy/spilledQueries", 2L);
+ emitter.verifyValue("groupBy/spilledBytes", 200L);
+ emitter.verifyValue("groupBy/mergeDictionarySize", 300L);
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
index 92c2a1064e7..05865e0a2df 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
@@ -23,18 +23,15 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.List;
import java.util.Map;
public class HistoricalMetricsMonitorTest extends EasyMockSupport
@@ -95,54 +92,26 @@ public class HistoricalMetricsMonitorTest extends
EasyMockSupport
monitor.doMonitor(serviceEmitter);
EasyMock.verify(druidServerConfig, segmentManager, segmentLoadDropMgr);
- final List<Event> events = serviceEmitter.getEvents();
-
- Assert.assertEquals(ImmutableMap.<String, Object>of(
- "metric", "segment/max",
- "value", maxSize
- ), asMap(events.get(0)));
-
- Assert.assertEquals(ImmutableMap.<String, Object>of(
- "dataSource", dataSource,
- "metric", "segment/pendingDelete",
- "priority", String.valueOf(priority),
- "tier", tier,
- "value", dataSegment.getSize()
- ), asMap(events.get(1)));
-
- Assert.assertEquals(ImmutableMap.<String, Object>of(
- "metric", "segment/used",
- "value", dataSegment.getSize(),
- "tier", tier,
- "priority", String.valueOf(priority),
- "dataSource", dataSource
- ), asMap(events.get(2)));
-
- Assert.assertEquals(ImmutableMap.<String, Object>of(
- "metric", "segment/usedPercent",
- "value", dataSegment.getSize() * 1.0D / maxSize,
- "tier", tier,
- "priority", String.valueOf(priority),
- "dataSource", dataSource
- ), asMap(events.get(3)));
-
- Assert.assertEquals(ImmutableMap.<String, Object>of(
- "metric", "segment/count",
- "value", 1L,
- "tier", tier,
- "priority", String.valueOf(priority),
- "dataSource", dataSource
- ), asMap(events.get(4)));
- }
-
- private Map<String, Object> asMap(Event event)
- {
- final Map<String, Object> map = event.toMap();
- Assert.assertNotNull(map.remove("feed"));
- Assert.assertNotNull(map.remove("timestamp"));
- Assert.assertNotNull(map.remove("service"));
- Assert.assertNotNull(map.remove("host"));
-
- return map;
+ serviceEmitter.verifyValue("segment/max", maxSize);
+ serviceEmitter.verifyValue(
+ "segment/pendingDelete",
+ Map.of("tier", tier, "dataSource", dataSource, "priority",
String.valueOf(priority)),
+ dataSegment.getSize()
+ );
+ serviceEmitter.verifyValue(
+ "segment/used",
+ Map.of("tier", tier, "priority", String.valueOf(priority),
"dataSource", dataSource),
+ dataSegment.getSize()
+ );
+ serviceEmitter.verifyValue(
+ "segment/usedPercent",
+ Map.of("tier", tier, "priority", String.valueOf(priority),
"dataSource", dataSource),
+ dataSegment.getSize() * 1.0D / maxSize
+ );
+ serviceEmitter.verifyValue(
+ "segment/count",
+ Map.of("tier", tier, "priority", String.valueOf(priority),
"dataSource", dataSource),
+ 1L
+ );
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index aeed3c40ea9..ef78930f65a 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -20,7 +20,6 @@
package org.apache.druid.server.metrics;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -34,10 +33,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
@@ -55,47 +53,54 @@ public class LatchableEmitter extends StubServiceEmitter
public static final String TYPE = "latching";
+ private final Set<WaitCondition> waitConditions = new HashSet<>();
+
+ private final ReentrantLock eventProcessingLock = new ReentrantLock();
+
/**
- * Single-threaded executor to evaluate conditions.
+ * Lists of events that have already been processed by {@link
#evaluateWaitConditions(Event)}.
*/
- private final ScheduledExecutorService conditionEvaluateExecutor;
- private final Set<WaitCondition> waitConditions = new HashSet<>();
- private final ReentrantReadWriteLock eventReadWriteLock = new
ReentrantReadWriteLock(true);
+ private final List<Event> processedEvents = new ArrayList<>();
/**
* Creates a {@link StubServiceEmitter} that may be used in embedded tests.
*/
- public LatchableEmitter(String service, String host,
ScheduledExecutorFactory executorFactory)
+ public LatchableEmitter(String service, String host)
{
super(service, host);
- this.conditionEvaluateExecutor = executorFactory.create(1,
"LatchingEmitter-eval-%d");
}
@Override
public void emit(Event event)
{
super.emit(event);
- triggerConditionEvaluations();
+ evaluateWaitConditions(event);
}
@Override
public void flush()
{
- // flush() or close() is typically not called in tests until the test is
complete
- // but acquire a lock all the same for the sake of completeness
- eventReadWriteLock.writeLock().lock();
+ eventProcessingLock.lock();
try {
super.flush();
+ processedEvents.clear();
}
finally {
- eventReadWriteLock.writeLock().unlock();
+ eventProcessingLock.unlock();
}
}
@Override
public void close()
{
- flush();
+ eventProcessingLock.lock();
+ try {
+ super.close();
+ processedEvents.clear();
+ }
+ finally {
+ eventProcessingLock.unlock();
+ }
}
/**
@@ -107,9 +112,8 @@ public class LatchableEmitter extends StubServiceEmitter
public void waitForEvent(Predicate<Event> condition, long timeoutMillis)
{
final WaitCondition waitCondition = new WaitCondition(condition);
- waitConditions.add(waitCondition);
+ registerWaitCondition(waitCondition);
- triggerConditionEvaluations();
try {
final long awaitTime = timeoutMillis >= 0 ? timeoutMillis :
Long.MAX_VALUE;
if (!waitCondition.countDownLatch.await(awaitTime,
TimeUnit.MILLISECONDS)) {
@@ -158,22 +162,12 @@ public class LatchableEmitter extends StubServiceEmitter
);
}
- private void triggerConditionEvaluations()
- {
- if (conditionEvaluateExecutor == null) {
- throw new ISE("Cannot evaluate conditions as the
'conditionEvaluateExecutor' is null.");
- } else {
- conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
- }
- }
-
/**
- * Evaluates wait conditions. This method must be invoked on the
- * {@link #conditionEvaluateExecutor} so that it does not block {@link
#emit(Event)}.
+ * Evaluates wait conditions for the given event.
*/
- private void evaluateWaitConditions()
+ private void evaluateWaitConditions(Event event)
{
- eventReadWriteLock.readLock().lock();
+ eventProcessingLock.lock();
try {
// Create a copy of the conditions for thread-safety
final List<WaitCondition> conditionsToEvaluate =
List.copyOf(waitConditions);
@@ -181,25 +175,46 @@ public class LatchableEmitter extends StubServiceEmitter
return;
}
- List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
- final int currentNumberOfEvents = events.size();
-
- // Do not use an iterator over the list to avoid concurrent
modification exceptions
- // Evaluate new events against this condition
- for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i)
{
- if (condition.predicate.test(events.get(i))) {
- condition.countDownLatch.countDown();
- }
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
+ }
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error while evaluating wait conditions for event[%s]",
event.toMap());
+ throw new ISE(e, "Error while evaluating wait conditions for event[%s]",
event.toMap());
+ }
+ finally {
+ processedEvents.add(event);
+ eventProcessingLock.unlock();
+ }
+ }
+
+ /**
+ * Evaluates the given new condition for all past events and then adds it to
+ * {@link #waitConditions}.
+ */
+ private void registerWaitCondition(WaitCondition condition)
+ {
+ eventProcessingLock.lock();
+ try {
+ for (Event event : processedEvents) {
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
+ break;
}
- condition.processedUntil = currentNumberOfEvents;
+ }
+
+ if (condition.countDownLatch.getCount() > 0) {
+ waitConditions.add(condition);
}
}
catch (Exception e) {
- log.error(e, "Error while evaluating wait conditions");
+ throw new ISE(e, "Error while evaluating condition");
}
finally {
- eventReadWriteLock.readLock().unlock();
+ eventProcessingLock.unlock();
}
}
@@ -208,8 +223,6 @@ public class LatchableEmitter extends StubServiceEmitter
private final Predicate<Event> predicate;
private final CountDownLatch countDownLatch;
- private int processedUntil;
-
private WaitCondition(Predicate<Event> predicate)
{
this.predicate = predicate;
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
index 58532d290d2..6f6377740a7 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java
@@ -31,10 +31,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
public class QueryCountStatsMonitorTest
{
@@ -111,18 +109,12 @@ public class QueryCountStatsMonitorTest
emitter.flush();
// Trigger metric emission
monitor.doMonitor(emitter);
- Map<String, Long> resultMap = emitter.getEvents()
- .stream()
- .collect(Collectors.toMap(
- event -> (String)
event.toMap().get("metric"),
- event -> (Long)
event.toMap().get("value")
- ));
- Assert.assertEquals(5, resultMap.size());
- Assert.assertEquals(1L, (long) resultMap.get("query/success/count"));
- Assert.assertEquals(2L, (long) resultMap.get("query/failed/count"));
- Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count"));
- Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count"));
- Assert.assertEquals(10L, (long) resultMap.get("query/count"));
+ Assert.assertEquals(5, emitter.getNumEmittedEvents());
+ emitter.verifyValue("query/success/count", 1L);
+ emitter.verifyValue("query/failed/count", 2L);
+ emitter.verifyValue("query/interrupted/count", 3L);
+ emitter.verifyValue("query/timeout/count", 4L);
+ emitter.verifyValue("query/count", 10L);
}
@Test
@@ -137,18 +129,13 @@ public class QueryCountStatsMonitorTest
emitter.flush();
// Trigger metric emission
monitor.doMonitor(emitter);
- Map<String, Long> resultMap = emitter.getEvents()
- .stream()
- .collect(Collectors.toMap(
- event -> (String)
event.toMap().get("metric"),
- event -> (Long)
event.toMap().get("value")
- ));
- Assert.assertEquals(6, resultMap.size());
- Assert.assertEquals(0, (long)
resultMap.get("mergeBuffer/pendingRequests"));
- Assert.assertEquals(1L, (long) resultMap.get("query/success/count"));
- Assert.assertEquals(2L, (long) resultMap.get("query/failed/count"));
- Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count"));
- Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count"));
- Assert.assertEquals(10L, (long) resultMap.get("query/count"));
+
+ Assert.assertEquals(6, emitter.getNumEmittedEvents());
+ emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
+ emitter.verifyValue("query/success/count", 1L);
+ emitter.verifyValue("query/failed/count", 2L);
+ emitter.verifyValue("query/interrupted/count", 3L);
+ emitter.verifyValue("query/timeout/count", 4L);
+ emitter.verifyValue("query/count", 10L);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
index 88acb6dca26..748672a12b4 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java
@@ -32,13 +32,15 @@ public class ServiceStatusMonitorTest
{
private ServiceStatusMonitor monitor;
+ private StubServiceEmitter emitter;
private Map<String, Object> heartbeatTags;
- private Supplier<Map<String, Object>> heartbeatTagsSupplier = () ->
heartbeatTags;
- private static String HEARTBEAT_METRIC_KEY = "service/heartbeat";
+ private final Supplier<Map<String, Object>> heartbeatTagsSupplier = () ->
heartbeatTags;
+ private static final String HEARTBEAT_METRIC_KEY = "service/heartbeat";
@Before
public void setUp()
{
+ emitter = new StubServiceEmitter();
monitor = new ServiceStatusMonitor();
heartbeatTags = new HashMap<>();
monitor.heartbeatTagsSupplier = heartbeatTagsSupplier;
@@ -47,23 +49,18 @@ public class ServiceStatusMonitorTest
@Test
public void testDefaultHeartbeatReported()
{
- final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
- Assert.assertEquals(HEARTBEAT_METRIC_KEY,
emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(HEARTBEAT_METRIC_KEY, 1);
}
@Test
public void testLeaderTag()
{
heartbeatTags.put("leader", 1);
- final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
- Assert.assertEquals(HEARTBEAT_METRIC_KEY,
emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1), 1);
}
@Test
@@ -71,12 +68,8 @@ public class ServiceStatusMonitorTest
{
heartbeatTags.put("leader", 1);
heartbeatTags.put("taskRunner", "http");
- final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
- Assert.assertEquals("http",
emitter.getEvents().get(0).toMap().get("taskRunner"));
- Assert.assertEquals(HEARTBEAT_METRIC_KEY,
emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1,
"taskRunner", "http"), 1);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
index 93687bc1be8..2489d033411 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -94,7 +94,7 @@ public class TaskCountStatsMonitorTest
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(9, emitter.getEvents().size());
+ Assert.assertEquals(9, emitter.getNumEmittedEvents());
emitter.verifyValue("task/success/count", Map.of("dataSource", "d1",
"taskType", "index"), 1L);
emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1",
"taskType", "index"), 1L);
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
index 0fed1c9b6bc..092a7b66d01 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
@@ -74,7 +74,7 @@ public class TaskSlotCountStatsMonitorTest
final TaskSlotCountStatsMonitor monitor = new
TaskSlotCountStatsMonitor(statsProvider);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(5, emitter.getEvents().size());
+ Assert.assertEquals(5, emitter.getNumEmittedEvents());
emitter.verifyValue("taskSlot/total/count", 1L);
emitter.verifyValue("taskSlot/idle/count", 1L);
emitter.verifyValue("taskSlot/used/count", 1L);
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
index a1930bef637..05e10ab0142 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
@@ -222,7 +222,7 @@ public class WorkerTaskCountStatsMonitorTest
new WorkerTaskCountStatsMonitor(injectorForMiddleManager,
ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(5, emitter.getEvents().size());
+ Assert.assertEquals(5, emitter.getNumEmittedEvents());
emitter.verifyValue(
"worker/task/failed/count",
ImmutableMap.of("category", "workerCategory", "workerVersion",
"workerVersion"),
@@ -257,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest
new WorkerTaskCountStatsMonitor(injectorForIndexer,
ImmutableSet.of(NodeRole.INDEXER));
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(10, emitter.getEvents().size());
+ Assert.assertEquals(10, emitter.getNumEmittedEvents());
emitter.verifyValue(
"worker/task/running/count",
ImmutableMap.of("dataSource", "wikipedia"),
@@ -317,7 +317,7 @@ public class WorkerTaskCountStatsMonitorTest
new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats,
ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
}
@Test
@@ -327,6 +327,6 @@ public class WorkerTaskCountStatsMonitorTest
new WorkerTaskCountStatsMonitor(injectorForPeon,
ImmutableSet.of(NodeRole.PEON));
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(0, emitter.getEvents().size());
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
}
}
diff --git
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 6bcf87fdc4a..f974d361b1f 100644
---
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -683,7 +683,8 @@ public class AsyncQueryForwardingServletTest extends
BaseJettyTest
}
catch (NullPointerException ignored) {
}
- Assert.assertEquals("query/time",
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+ // Assert.assertEquals("query/time",
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+ stubServiceEmitter.verifyEmitted("query/time", 1);
if (!isJDBCSql) {
Assert.assertEquals("dummy",
stubServiceEmitter.getEvents().get(0).toMap().get("id"));
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
index 1333164449f..71c228a96e5 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
@@ -26,7 +26,6 @@ import com.google.inject.name.Names;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
@@ -48,10 +47,9 @@ public class LatchableEmitterModule implements DruidModule
@Provides
@ManageLifecycle
public LatchableEmitter makeEmitter(
- @Self DruidNode selfNode,
- ScheduledExecutorFactory executorFactory
+ @Self DruidNode selfNode
)
{
- return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(),
executorFactory);
+ return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]