This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 18d2a8957f6 Refactor: Cleanup test impls of ServiceEmitter (#15683)
18d2a8957f6 is described below
commit 18d2a8957f64e04cb82d1ae2a14017094dad9db8
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Jan 15 17:37:00 2024 +0530
Refactor: Cleanup test impls of ServiceEmitter (#15683)
---
.../kafka/supervisor/KafkaSupervisorTest.java | 24 ++++----
.../kinesis/supervisor/KinesisSupervisorTest.java | 28 ++++-----
.../indexing/common/task/CompactionTaskTest.java | 20 +++---
.../druid/indexing/common/task/IndexTaskTest.java | 36 ++---------
.../AbstractParallelIndexSupervisorTaskTest.java | 5 +-
.../java/util/metrics/StubServiceEmitter.java | 5 ++
.../org/apache/druid/query/CachingEmitter.java | 54 ----------------
.../druid/query/DefaultQueryMetricsTest.java | 69 +++++++--------------
.../groupby/DefaultGroupByQueryMetricsTest.java | 15 ++---
.../search/DefaultSearchQueryMetricsTest.java | 15 ++---
.../DefaultTimeseriesQueryMetricsTest.java | 15 ++---
.../query/topn/DefaultTopNQueryMetricsTest.java | 15 ++---
.../metrics/ExceptionCapturingServiceEmitter.java | 72 ----------------------
13 files changed, 91 insertions(+), 282 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index b221cdf418c..21221185e76 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -75,12 +75,14 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.AlertBuilder;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@@ -90,7 +92,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
@@ -166,7 +167,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private TaskQueue taskQueue;
private String topic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
- private ExceptionCapturingServiceEmitter serviceEmitter;
+ private StubServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
private KafkaSupervisorIngestionSpec ingestionSchema;
@@ -222,7 +223,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
- serviceEmitter = new ExceptionCapturingServiceEmitter();
+ serviceEmitter = new StubServiceEmitter("KafkaSupervisorTest",
"localhost");
EmittingLogger.registerEmitter(serviceEmitter);
supervisorConfig = new SupervisorStateManagerConfig();
ingestionSchema = EasyMock.createMock(KafkaSupervisorIngestionSpec.class);
@@ -3340,9 +3341,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
- Assert.assertNull(serviceEmitter.getStackTrace(),
serviceEmitter.getStackTrace());
- Assert.assertNull(serviceEmitter.getExceptionMessage(),
serviceEmitter.getExceptionMessage());
- Assert.assertNull(serviceEmitter.getExceptionClass());
+ Assert.assertTrue(serviceEmitter.getAlerts().isEmpty());
}
@Test(timeout = 60_000L)
@@ -3426,18 +3425,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
- while (serviceEmitter.getStackTrace() == null) {
+ while (serviceEmitter.getAlerts().isEmpty()) {
Thread.sleep(100);
}
- Assert.assertTrue(
-
serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE:
Cannot find")
+ AlertEvent alert = serviceEmitter.getAlerts().get(0);
+ Assert.assertEquals(
+ "SeekableStreamSupervisor[testDS] failed to handle notice",
+ alert.getDescription()
);
Assert.assertEquals(
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
- serviceEmitter.getExceptionMessage()
+ alert.getDataMap().get(AlertBuilder.EXCEPTION_MESSAGE_KEY)
);
- Assert.assertEquals(ISE.class.getName(),
serviceEmitter.getExceptionClass());
}
@Test
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index d907a372d7e..f38697c8180 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -71,12 +71,14 @@ import
org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.AlertBuilder;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -86,7 +88,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
@@ -97,8 +98,6 @@ import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
@@ -153,7 +152,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
private SeekableStreamIndexTaskClient<String, String> taskClient;
private TaskQueue taskQueue;
private RowIngestionMetersFactory rowIngestionMetersFactory;
- private ExceptionCapturingServiceEmitter serviceEmitter;
+ private StubServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
public KinesisSupervisorTest()
@@ -213,7 +212,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
- serviceEmitter = new ExceptionCapturingServiceEmitter();
+ serviceEmitter = new StubServiceEmitter("KinesisSupervisorTest",
"localhost");
EmittingLogger.registerEmitter(serviceEmitter);
supervisorConfig = new SupervisorStateManagerConfig();
}
@@ -3317,9 +3316,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
verifyAll();
- Assert.assertNull(serviceEmitter.getStackTrace(),
serviceEmitter.getStackTrace());
- Assert.assertNull(serviceEmitter.getExceptionMessage(),
serviceEmitter.getExceptionMessage());
- Assert.assertNull(serviceEmitter.getExceptionClass());
+ Assert.assertTrue(serviceEmitter.getAlerts().isEmpty());
}
@Test(timeout = 60_000L)
@@ -3443,20 +3440,19 @@ public class KinesisSupervisorTest extends
EasyMockSupport
verifyAll();
- while (serviceEmitter.getStackTrace() == null) {
+ while (serviceEmitter.getAlerts().isEmpty()) {
Thread.sleep(100);
}
- MatcherAssert.assertThat(
- serviceEmitter.getStackTrace(),
- CoreMatchers.startsWith("org.apache.druid.java.util.common.ISE: Cannot
find taskGroup")
+ final AlertEvent alert = serviceEmitter.getAlerts().get(0);
+ Assert.assertEquals(
+ "SeekableStreamSupervisor[testDS] failed to handle notice",
+ alert.getDescription()
);
-
Assert.assertEquals(
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
- serviceEmitter.getExceptionMessage()
+ alert.getDataMap().get(AlertBuilder.EXCEPTION_MESSAGE_KEY)
);
- Assert.assertEquals(ISE.class.getName(),
serviceEmitter.getExceptionClass());
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c7f6168d85d..321ad5db3fd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -88,7 +88,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.query.CachingEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
@@ -375,18 +375,17 @@ public class CompactionTaskTest
@Mock
private Clock clock;
- private CachingEmitter emitter;
+ private StubServiceEmitter emitter;
@Before
public void setup()
{
final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
- emitter = new CachingEmitter();
+ emitter = new StubServiceEmitter();
toolbox = makeTaskToolbox(
new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
- SEGMENT_MAP,
- emitter
+ SEGMENT_MAP
);
Mockito.when(clock.millis()).thenReturn(0L, 10_000L);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER);
@@ -1551,9 +1550,7 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
BatchIOConfig.DEFAULT_DROP_EXISTING
);
- Assert.assertEquals(10_000L,
emitter.getLastEmittedEvent().toMap().get("value"));
- Assert.assertEquals("compact/segmentAnalyzer/fetchAndProcessMillis",
emitter.getLastEmittedEvent().toMap().get("metric"));
- Assert.assertEquals("metrics", emitter.getLastEmittedEvent().getFeed());
+ emitter.verifyValue("compact/segmentAnalyzer/fetchAndProcessMillis",
10_000L);
}
@Test
@@ -1929,11 +1926,10 @@ public class CompactionTaskTest
}
}
- private static TaskToolbox makeTaskToolbox(
+ private TaskToolbox makeTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
- Map<DataSegment, File> segments,
- CachingEmitter emitter
+ Map<DataSegment, File> segments
)
{
final SegmentCacheManager segmentCacheManager = new
NoopSegmentCacheManager()
@@ -1974,7 +1970,7 @@ public class CompactionTaskTest
.segmentCacheManager(segmentCacheManager)
.taskLogPusher(null)
.attemptId("1")
- .emitter(new ServiceEmitter("service", "host", emitter))
+ .emitter(emitter)
.build();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 1ade29d0d8a..a172753ed92 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -61,8 +61,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -130,8 +129,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
public class IndexTaskTest extends IngestionTestBase
@@ -1368,13 +1365,10 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws
IOException, InterruptedException
+ public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws
IOException
{
final File tmpDir = temporaryFolder.newFolder();
- LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
- latchEmitter.latch = new CountDownLatch(1);
-
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
@@ -1414,8 +1408,9 @@ public class IndexTaskTest extends IngestionTestBase
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
+ final StubServiceEmitter emitter = new StubServiceEmitter("IndexTaskTest",
"localhost");
EasyMock.expect(mockToolbox.getEmitter())
- .andReturn(latchEmitter).anyTimes();
+ .andReturn(emitter).anyTimes();
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
@@ -1423,7 +1418,7 @@ public class IndexTaskTest extends IngestionTestBase
EasyMock.replay(mockDataSegment1, mockDataSegment2);
Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox,
segmentsToWaitFor, 30000));
- latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
+ emitter.verifyEmitted("task/segmentAvailability/wait/time", 1);
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}
@@ -3026,27 +3021,6 @@ public class IndexTaskTest extends IngestionTestBase
}
}
- /**
- * Used to test that expected metric is emitted by
AbstractBatchIndexTask#waitForSegmentAvailability
- */
- private static class LatchableServiceEmitter extends ServiceEmitter
- {
- private CountDownLatch latch;
-
- private LatchableServiceEmitter()
- {
- super("", "", null);
- }
-
- @Override
- public void emit(Event event)
- {
- if (latch != null &&
"task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
- latch.countDown();
- }
- }
- }
-
@Test
public void testEqualsAndHashCode()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 480a37c5723..0f165912e9a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -76,10 +76,9 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
-import org.apache.druid.query.CachingEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
@@ -711,7 +710,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
.shuffleClient(new LocalShuffleClient(intermediaryDataManager))
.taskLogPusher(null)
.attemptId("1")
- .emitter(new ServiceEmitter("service", "host", new CachingEmitter()))
+ .emitter(new StubServiceEmitter())
.build();
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 40826af8047..2ddba7c6cd8 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -40,6 +40,11 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final Map<String, List<ServiceMetricEvent>> metricEvents = new
HashMap<>();
+ public StubServiceEmitter()
+ {
+ super("testing", "localhost", null);
+ }
+
public StubServiceEmitter(String service, String host)
{
super(service, host, null);
diff --git
a/processing/src/test/java/org/apache/druid/query/CachingEmitter.java
b/processing/src/test/java/org/apache/druid/query/CachingEmitter.java
deleted file mode 100644
index 11014328537..00000000000
--- a/processing/src/test/java/org/apache/druid/query/CachingEmitter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query;
-
-import org.apache.druid.java.util.emitter.core.Emitter;
-import org.apache.druid.java.util.emitter.core.Event;
-
-public class CachingEmitter implements Emitter
-{
- private Event lastEmittedEvent;
-
- public Event getLastEmittedEvent()
- {
- return lastEmittedEvent;
- }
-
- @Override
- public void start()
- {
- }
-
- @Override
- public void emit(Event event)
- {
- lastEmittedEvent = event;
- }
-
- @Override
- public void flush()
- {
- }
-
- @Override
- public void close()
- {
- }
-}
diff --git
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index d512a294ef8..f2fbdc1eb98 100644
---
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -22,13 +22,14 @@ package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@@ -37,7 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DefaultQueryMetricsTest
+public class DefaultQueryMetricsTest extends InitializedNullHandlingTest
{
/**
@@ -47,8 +48,7 @@ public class DefaultQueryMetricsTest
@Test
public void testDefaultQueryMetricsQuery()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>();
TopNQuery query = new TopNQueryBuilder()
.dataSource("xx")
@@ -71,7 +71,7 @@ public class DefaultQueryMetricsTest
// This change is done to keep the code coverage tool happy by exercising
the implementation
queryMetrics.sqlQueryId("dummy");
queryMetrics.queryId("dummy");
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
+ Map<String, Object> actualEvent =
serviceEmitter.getEvents().get(0).toMap();
Assert.assertEquals(13, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
@@ -94,82 +94,59 @@ public class DefaultQueryMetricsTest
@Test
public void testDefaultQueryMetricsMetricNamesAndUnits()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>();
- testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter,
queryMetrics);
+ testQueryMetricsDefaultMetricNamesAndUnits(queryMetrics);
}
public static void testQueryMetricsDefaultMetricNamesAndUnits(
- CachingEmitter cachingEmitter,
- ServiceEmitter serviceEmitter,
QueryMetrics<? extends Query<?>> queryMetrics
)
{
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter();
queryMetrics.reportQueryTime(1000001).emit(serviceEmitter);
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/time", actualEvent.get("metric"));
// query/time and most metrics below are measured in milliseconds by
default
- Assert.assertEquals(1L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/time", 1L);
queryMetrics.reportWaitTime(2000001).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/wait/time", actualEvent.get("metric"));
- Assert.assertEquals(2L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/wait/time", 2L);
queryMetrics.reportSegmentTime(3000001).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/segment/time", actualEvent.get("metric"));
- Assert.assertEquals(3L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/segment/time", 3L);
queryMetrics.reportSegmentAndCacheTime(4000001).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/segmentAndCache/time",
actualEvent.get("metric"));
- Assert.assertEquals(4L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/segmentAndCache/time", 4L);
- queryMetrics.reportCpuTime(6000001).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/cpu/time", actualEvent.get("metric"));
// CPU time is measured in microseconds by default
- Assert.assertEquals(6000L, actualEvent.get("value"));
+ queryMetrics.reportCpuTime(6000001).emit(serviceEmitter);
+ serviceEmitter.verifyValue("query/cpu/time", 6000L);
queryMetrics.reportNodeTimeToFirstByte(7000001).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/node/ttfb", actualEvent.get("metric"));
- Assert.assertEquals(7L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/node/ttfb", 7L);
queryMetrics.reportNodeTime(8000001).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/node/time", actualEvent.get("metric"));
- Assert.assertEquals(8L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/node/time", 8L);
queryMetrics.reportQueryBytes(9).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/bytes", actualEvent.get("metric"));
- Assert.assertEquals(9L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/bytes", 9L);
queryMetrics.reportNodeBytes(10).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
- Assert.assertEquals(10L, actualEvent.get("value"));
+ serviceEmitter.verifyValue("query/node/bytes", 10L);
+ Assert.assertEquals(9, serviceEmitter.getEvents().size());
- // Here we are testing that Queried Segment Count does not get emitted by
the DefaultQueryMetrics and the last
- // metric remains as query/node/bytes
+ // Verify that Queried Segment Count does not get emitted by the
DefaultQueryMetrics
+ // and the total number of emitted metrics remains unchanged
queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter);
- actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
- Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
- Assert.assertEquals(10L, actualEvent.get("value"));
+ Assert.assertEquals(9, serviceEmitter.getEvents().size());
}
@Test
public void testVectorizedDimensionInMetrics()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>();
queryMetrics.vectorized(true);
queryMetrics.reportSegmentTime(0).emit(serviceEmitter);
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
+ Map<String, Object> actualEvent =
serviceEmitter.getEvents().get(0).toMap();
Assert.assertEquals(7, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
index bc2109e0c27..80c0ad40588 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
@@ -22,8 +22,7 @@ package org.apache.druid.query.groupby;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.CachingEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultQueryMetricsTest;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
@@ -33,6 +32,7 @@ import
org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.lookup.LookupExtractionFn;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
@@ -41,7 +41,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.Map;
-public class DefaultGroupByQueryMetricsTest
+public class DefaultGroupByQueryMetricsTest extends InitializedNullHandlingTest
{
/**
@@ -51,8 +51,7 @@ public class DefaultGroupByQueryMetricsTest
@Test
public void testDefaultGroupByQueryMetricsQuery()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics();
GroupByQuery.Builder builder = GroupByQuery
.builder()
@@ -75,7 +74,7 @@ public class DefaultGroupByQueryMetricsTest
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
+ Map<String, Object> actualEvent =
serviceEmitter.getEvents().get(0).toMap();
Assert.assertEquals(16, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
@@ -103,9 +102,7 @@ public class DefaultGroupByQueryMetricsTest
@Test
public void testDefaultGroupByQueryMetricsMetricNamesAndUnits()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics();
-
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter,
serviceEmitter, queryMetrics);
+
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(queryMetrics);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
index a03781566b5..f29f93ff862 100644
---
a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
@@ -22,14 +22,14 @@ package org.apache.druid.query.search;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.CachingEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultQueryMetricsTest;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@@ -38,7 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DefaultSearchQueryMetricsTest
+public class DefaultSearchQueryMetricsTest extends InitializedNullHandlingTest
{
/**
@@ -47,8 +47,7 @@ public class DefaultSearchQueryMetricsTest
@Test
public void testDefaultSearchQueryMetricsQuery()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
SearchQuery query = Druids
.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
@@ -67,7 +66,7 @@ public class DefaultSearchQueryMetricsTest
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
+ Map<String, Object> actualEvent =
serviceEmitter.getEvents().get(0).toMap();
Assert.assertEquals(13, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
@@ -101,9 +100,7 @@ public class DefaultSearchQueryMetricsTest
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.build();
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
SearchQueryMetrics queryMetrics =
DefaultSearchQueryMetricsFactory.instance().makeMetrics(query);
-
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter,
serviceEmitter, queryMetrics);
+
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(queryMetrics);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
index 6f2c45027cf..61b9dc5edba 100644
---
a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
@@ -20,12 +20,12 @@
package org.apache.druid.query.timeseries;
import com.google.common.collect.ImmutableMap;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.CachingEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultQueryMetricsTest;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@@ -34,7 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DefaultTimeseriesQueryMetricsTest
+public class DefaultTimeseriesQueryMetricsTest extends
InitializedNullHandlingTest
{
/**
@@ -44,8 +44,7 @@ public class DefaultTimeseriesQueryMetricsTest
@Test
public void testDefaultTimeseriesQueryMetricsQuery()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
DefaultTimeseriesQueryMetrics queryMetrics = new
DefaultTimeseriesQueryMetrics();
TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
@@ -60,7 +59,7 @@ public class DefaultTimeseriesQueryMetricsTest
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
+ Map<String, Object> actualEvent =
serviceEmitter.getEvents().get(0).toMap();
Assert.assertEquals(16, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
@@ -89,9 +88,7 @@ public class DefaultTimeseriesQueryMetricsTest
@Test
public void testDefaultTimeseriesQueryMetricsMetricNamesAndUnits()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultTimeseriesQueryMetrics queryMetrics = new
DefaultTimeseriesQueryMetrics();
-
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter,
serviceEmitter, queryMetrics);
+
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(queryMetrics);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
index 19f561afe44..e232e6a6405 100644
---
a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
@@ -22,8 +22,7 @@ package org.apache.druid.query.topn;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.CachingEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultQueryMetricsTest;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryRunnerTestHelper;
@@ -31,6 +30,7 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@@ -39,7 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DefaultTopNQueryMetricsTest
+public class DefaultTopNQueryMetricsTest extends InitializedNullHandlingTest
{
/**
@@ -49,8 +49,7 @@ public class DefaultTopNQueryMetricsTest
@Test
public void testDefaultTopNQueryMetricsQuery()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
+ final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics();
TopNQuery query = new TopNQueryBuilder()
.dataSource("xx")
@@ -70,7 +69,7 @@ public class DefaultTopNQueryMetricsTest
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
- Map<String, Object> actualEvent =
cachingEmitter.getLastEmittedEvent().toMap();
+ Map<String, Object> actualEvent =
serviceEmitter.getEvents().get(0).toMap();
Assert.assertEquals(17, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
@@ -101,9 +100,7 @@ public class DefaultTopNQueryMetricsTest
@Test
public void testDefaultTopNQueryMetricsMetricNamesAndUnits()
{
- CachingEmitter cachingEmitter = new CachingEmitter();
- ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics();
-
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter,
serviceEmitter, queryMetrics);
+
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(queryMetrics);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java
b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java
deleted file mode 100644
index 8a80b3df3e5..00000000000
---
a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.metrics;
-
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.AlertBuilder;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-
-import javax.annotation.Nullable;
-import java.util.Map;
-
-public class ExceptionCapturingServiceEmitter extends ServiceEmitter
-{
- private volatile String exceptionClass;
- private volatile String exceptionMessage;
- private volatile String stackTrace;
-
- public ExceptionCapturingServiceEmitter()
- {
- super("", "", null);
- }
-
- @Override
- public void emit(Event event)
- {
- //noinspection unchecked
- final Map<String, Object> dataMap = (Map<String, Object>)
event.toMap().get("data");
- final String exceptionClass = (String)
dataMap.get(AlertBuilder.EXCEPTION_TYPE_KEY);
- if (exceptionClass != null) {
- final String exceptionMessage = (String)
dataMap.get(AlertBuilder.EXCEPTION_MESSAGE_KEY);
- final String stackTrace = (String)
dataMap.get(AlertBuilder.EXCEPTION_STACK_TRACE_KEY);
- this.exceptionClass = exceptionClass;
- this.exceptionMessage = exceptionMessage;
- this.stackTrace = stackTrace;
- }
- }
-
- @Nullable
- public String getExceptionClass()
- {
- return exceptionClass;
- }
-
- @Nullable
- public String getExceptionMessage()
- {
- return exceptionMessage;
- }
-
- @Nullable
- public String getStackTrace()
- {
- return stackTrace;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]