This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch peon_dimension_service_emitter in repository https://gitbox.apache.org/repos/asf/druid.git
commit 4cf67157ff5a0d4d9a4374127fc1439ceb42a69e Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Tue Dec 30 10:35:35 2025 -0500 Review comments --- .../java/util/emitter/service/ServiceEmitter.java | 2 +- .../apache/druid/java/util/metrics/TaskHolder.java | 6 +++--- .../java/util/metrics/CgroupCpuMonitorTest.java | 6 +++--- .../java/util/metrics/CgroupCpuSetMonitorTest.java | 4 ++-- .../java/util/metrics/CgroupDiskMonitorTest.java | 2 +- .../java/util/metrics/CgroupV2CpuMonitorTest.java | 2 +- .../java/util/metrics/CgroupV2DiskMonitorTest.java | 2 +- .../java/util/metrics/StubServiceEmitter.java | 16 +++++++------- .../druid/query/CPUTimeMetricQueryRunnerTest.java | 2 +- .../druid/query/DefaultQueryMetricsTest.java | 12 +++++------ .../groupby/DefaultGroupByQueryMetricsTest.java | 6 +++--- .../search/DefaultSearchQueryMetricsTest.java | 6 +++--- .../DefaultTimeseriesQueryMetricsTest.java | 6 +++--- .../query/topn/DefaultTopNQueryMetricsTest.java | 6 +++--- .../druid/server/ClientQuerySegmentWalkerTest.java | 2 +- .../org/apache/druid/server/QueryResourceTest.java | 2 +- .../server/coordinator/duty/RunRulesTest.java | 2 +- .../druid/server/emitter/EmitterModuleTest.java | 24 ++++++++++----------- .../jetty/JettyServerModuleTest.java | 3 +-- .../server/metrics/GroupByStatsMonitorTest.java | 25 ++++++++++++---------- .../druid/server/metrics/LatchableEmitter.java | 5 +++-- .../java/org/apache/druid/cli/PeonTaskHolder.java | 11 +++++----- .../server/AsyncQueryForwardingServletTest.java | 8 +++---- .../embedded/emitter/LatchableEmitterModule.java | 6 ++++-- .../org/apache/druid/sql/http/SqlResourceTest.java | 2 +- 25 files changed, 87 insertions(+), 81 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java index b1f92d09629..d5e241df5a4 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java @@ -32,7 +32,7 @@ import java.io.IOException; public class ServiceEmitter implements Emitter { - protected final Emitter emitter; + private final Emitter emitter; private final String service; private final ImmutableMap<String, String> otherServiceDimensions; private final String host; diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java b/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java index a9b64d9099b..9f0f11077bc 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java @@ -24,7 +24,7 @@ import java.util.Map; /** * Provides identifying information for a task. Implementations return {@code null} - * when used in server processes that are not {@code CliPeon}. Note that t + * when used in server processes that are not {@code CliPeon}. */ public interface TaskHolder { @@ -41,13 +41,13 @@ public interface TaskHolder String getTaskId(); /** - * @return the taskId, or {@code null} if called from a server that is not {@code CliPeon}. + * @return the type name of this task, or {@code null} if called from a server that is not {@code CliPeon}. */ @Nullable String getTaskType(); /** - * @return the taskId, or {@code null} if called from a server that is not {@code CliPeon}. + * @return the group ID of this task, or {@code null} if called from a server that is not {@code CliPeon}. */ @Nullable String getGroupId(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java index 2ecf91d1dce..b8f4bd28882 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java @@ -76,7 +76,7 @@ public class CgroupCpuMonitorTest public void testMonitor() throws IOException, InterruptedException { final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); final List<Event> actualEvents = emitter.getEvents(); Assert.assertEquals(2, actualEvents.size()); @@ -132,7 +132,7 @@ public class CgroupCpuMonitorTest // Constructor should detect v2 and log warning CgroupCpuMonitor monitor = new CgroupCpuMonitor(v2Discoverer, "test-feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); // doMonitor should return true Assert.assertTrue(monitor.doMonitor(emitter)); @@ -147,7 +147,7 @@ public class CgroupCpuMonitorTest // This test verifies that the existing v1 monitoring continues to work // after the v2 detection changes final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); final List<Event> actualEvents = emitter.getEvents(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java index 97cc91303b7..db17161869d 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java @@ -71,7 +71,7 @@ public class CgroupCpuSetMonitorTest public void testMonitor() { final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(4, emitter.getNumEmittedEvents()); @@ -102,7 +102,7 @@ public class CgroupCpuSetMonitorTest // Constructor should detect v2 and log warning CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(v2Discoverer, "test-feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); // doMonitor should return true but skip actual monitoring Assert.assertTrue(monitor.doMonitor(emitter)); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java index d624a80dd79..ad07a35a600 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java @@ -65,7 +65,7 @@ public class CgroupDiskMonitorTest public void testMonitor() throws IOException { final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(0, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java index 9a32dbbf86a..48544d41a02 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java @@ -61,7 +61,7 @@ public class CgroupV2CpuMonitorTest public void testMonitor() throws IOException, InterruptedException { final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(2, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java index f9a977edd92..119937e47a9 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java @@ -59,7 +59,7 @@ public class CgroupV2DiskMonitorTest public void testMonitor() throws IOException { final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(0, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 8c182763bc6..4bcdd58f182 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -55,23 +55,23 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie this("testing", "localhost"); } - /** - * Initialize a stub service emitter and auto-{@link #start()} it for test convenience. - */ public StubServiceEmitter(String service, String host) { this(service, host, new NoopTaskHolder()); - super.start(); } - /** - * Initialize a stub service emitter. Tests must explicitly call {@link #start()}. - */ public StubServiceEmitter(String service, String host, TaskHolder taskHolder) { super(service, host, new NoopEmitter(), ImmutableMap.of(), taskHolder); } + public static StubServiceEmitter createStarted() + { + final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter(); + stubServiceEmitter.start(); + return stubServiceEmitter; + } + @Override public void emit(Event event) { @@ -192,7 +192,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie public void close() { try { - emitter.close(); + super.close(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java index 32d9852d0c1..b55f19c62be 100644 --- a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java @@ -41,7 +41,7 @@ public class CPUTimeMetricQueryRunnerTest @Test public void testCpuTimeMetric() { - final StubServiceEmitter emitter = new StubServiceEmitter("s", "h"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); final AtomicLong accumulator = new AtomicLong(); final List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList( diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index ee82f206f79..8cea00215e0 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -48,7 +48,7 @@ public class DefaultQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>(); TopNQuery query = new TopNQueryBuilder() .dataSource("xx") @@ -75,8 +75,8 @@ public class DefaultQueryMetricsTest extends InitializedNullHandlingTest Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List<Interval> expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); @@ -144,7 +144,7 @@ public class DefaultQueryMetricsTest extends InitializedNullHandlingTest @Test public void testVectorizedDimensionInMetrics() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>(); queryMetrics.vectorized(true); queryMetrics.reportSegmentTime(0).emit(serviceEmitter); @@ -152,8 +152,8 @@ public class DefaultQueryMetricsTest extends InitializedNullHandlingTest Assert.assertEquals(7, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals("query/segment/time", actualEvent.get("metric")); Assert.assertEquals(0L, actualEvent.get("value")); Assert.assertEquals(true, actualEvent.get("vectorized")); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java index 80c0ad40588..66f89a92cec 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java @@ -51,7 +51,7 @@ public class DefaultGroupByQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultGroupByQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics(); GroupByQuery.Builder builder = GroupByQuery .builder() @@ -78,8 +78,8 @@ public class DefaultGroupByQueryMetricsTest extends InitializedNullHandlingTest Assert.assertEquals(16, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); Interval expectedInterval = Intervals.of("2011-04-02/2011-04-04"); diff --git a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java index f29f93ff862..8f613b71a9e 100644 --- a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java @@ -47,7 +47,7 @@ public class DefaultSearchQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultSearchQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); SearchQuery query = Druids .newSearchQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -70,8 +70,8 @@ public class DefaultSearchQueryMetricsTest extends InitializedNullHandlingTest Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List<Interval> expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java index 61b9dc5edba..0b426b08be5 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java @@ -44,7 +44,7 @@ public class DefaultTimeseriesQueryMetricsTest extends InitializedNullHandlingTe @Test public void testDefaultTimeseriesQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics(); TimeseriesQuery query = Druids .newTimeseriesQueryBuilder() @@ -63,8 +63,8 @@ public class DefaultTimeseriesQueryMetricsTest extends InitializedNullHandlingTe Assert.assertEquals(16, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List<Interval> expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); diff --git a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java index e232e6a6405..074ea10330a 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java @@ -49,7 +49,7 @@ public class DefaultTopNQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultTopNQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics(); TopNQuery query = new TopNQueryBuilder() .dataSource("xx") @@ -73,8 +73,8 @@ public class DefaultTopNQueryMetricsTest extends InitializedNullHandlingTest Assert.assertEquals(17, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List<Interval> expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 021227a3ca3..b374bbb820f 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -221,7 +221,7 @@ public class ClientQuerySegmentWalkerTest private Closer closer; private QueryRunnerFactoryConglomerate conglomerate; - private final StubServiceEmitter emitter = new StubServiceEmitter(); + private final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); // Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter. private final List<ExpectedQuery> issuedQueries = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 72c895e46a0..b1ff73cace0 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -258,7 +258,7 @@ public class QueryResourceTest queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER; testRequestLogger = new TestRequestLogger(); - emitter = new StubServiceEmitter(); + emitter = StubServiceEmitter.createStarted(); queryResource = createQueryResource(ResponseContextConfig.newConfig(true)); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java index f7e9ececc1a..a3042db2df1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java @@ -96,7 +96,7 @@ public class RunRulesTest public void setUp() { mockPeon = EasyMock.createMock(LoadQueuePeon.class); - emitter = new StubServiceEmitter("coordinator", "host"); + emitter = StubServiceEmitter.createStarted(); EmittingLogger.registerEmitter(emitter); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); ruleRunner = new RunRules((ds, set) -> set.size(), databaseRuleManager::getRulesWithDefault); diff --git a/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java b/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java index a574852d447..536ff5c34ce 100644 --- a/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java @@ -118,7 +118,7 @@ public class EmitterModuleTest ImmutableSet<NodeRole> nodeRoles = ImmutableSet.of(); - TestTaskHolder testTaskHolder = new TestTaskHolder("d", "e", "a", "w"); + TestTaskHolder testTaskHolder = new TestTaskHolder("wiki", "id1", "type1", "group1"); Injector injector = Guice.createInjector( new JacksonModule(), new LifecycleModule(), @@ -156,16 +156,16 @@ public class EmitterModuleTest Assert.assertEquals(1, events.size()); ServiceMetricEvent event = (ServiceMetricEvent) events.get(0); EventMap map = event.toMap(); - Assert.assertEquals("e", map.get(DruidMetrics.TASK_ID)); - Assert.assertEquals("e", map.get(DruidMetrics.ID)); - Assert.assertEquals("a", map.get(DruidMetrics.TASK_TYPE)); - Assert.assertEquals("w", map.get(DruidMetrics.GROUP_ID)); - Assert.assertEquals("d", map.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals("id1", map.get(DruidMetrics.TASK_ID)); + Assert.assertEquals("id1", map.get(DruidMetrics.ID)); + Assert.assertEquals("type1", map.get(DruidMetrics.TASK_TYPE)); + Assert.assertEquals("group1", map.get(DruidMetrics.GROUP_ID)); + Assert.assertEquals("wiki", map.get(DruidMetrics.DATASOURCE)); stubEmitter.flush(); // Override a dimension and verify that is emitted final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder(); - builder2.setDimension("taskId", "xyz"); + builder2.setDimension("taskId", "id2"); builder2.setMetric("metric2", 1); instance.emit(builder2); @@ -173,11 +173,11 @@ public class EmitterModuleTest Assert.assertEquals(1, events2.size()); ServiceMetricEvent event2 = (ServiceMetricEvent) events2.get(0); EventMap map2 = event2.toMap(); - Assert.assertEquals("xyz", map2.get(DruidMetrics.TASK_ID)); - Assert.assertEquals("e", map2.get(DruidMetrics.ID)); - Assert.assertEquals("a", map2.get(DruidMetrics.TASK_TYPE)); - Assert.assertEquals("w", map2.get(DruidMetrics.GROUP_ID)); - Assert.assertEquals("d", map2.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals("id2", map2.get(DruidMetrics.TASK_ID)); + Assert.assertEquals("id1", map2.get(DruidMetrics.ID)); + Assert.assertEquals("type1", map2.get(DruidMetrics.TASK_TYPE)); + Assert.assertEquals("group1", map2.get(DruidMetrics.GROUP_ID)); + Assert.assertEquals("wiki", map2.get(DruidMetrics.DATASOURCE)); } private Injector makeInjectorWithProperties(final Properties props) diff --git a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java index 27f81f22c01..317c69e17e2 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java @@ -56,8 +56,7 @@ public class JettyServerModuleTest JettyServerModule.JettyMonitor jettyMonitor = new JettyServerModule.JettyMonitor(); - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host"); - serviceEmitter.start(); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); jettyMonitor.doMonitor(serviceEmitter); serviceEmitter.verifyValue("jetty/numOpenConnections", 0); diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index e969cdba7e4..94da5889d39 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -116,17 +116,21 @@ public class GroupByStatsMonitorTest // Trigger metric emission monitor.doMonitor(emitter); - final Map<String, Object> dimFilters = Map.of(DruidMetrics.DATASOURCE, dataSource, DruidMetrics.TASK_ID, taskId, - DruidMetrics.ID, taskId, DruidMetrics.TASK_TYPE, taskType, DruidMetrics.GROUP_ID, groupId + final Map<String, Object> dimFilters = Map.of( + DruidMetrics.DATASOURCE, dataSource, + DruidMetrics.TASK_ID, taskId, + DruidMetrics.ID, taskId, + DruidMetrics.TASK_TYPE, + taskType, DruidMetrics.GROUP_ID, groupId ); - verifyTaskServiceDimensions(emitter, "mergeBuffer/pendingRequests", dimFilters, 0L); - verifyTaskServiceDimensions(emitter, "mergeBuffer/used", dimFilters, 0L); - verifyTaskServiceDimensions(emitter, "mergeBuffer/queries", dimFilters, 1L); - verifyTaskServiceDimensions(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L); - verifyTaskServiceDimensions(emitter, "groupBy/spilledQueries", dimFilters, 2L); - verifyTaskServiceDimensions(emitter, "groupBy/spilledBytes", dimFilters, 200L); - verifyTaskServiceDimensions(emitter, "groupBy/mergeDictionarySize", dimFilters, 300L); + verifyMetricValue(emitter, "mergeBuffer/pendingRequests", dimFilters, 0L); + verifyMetricValue(emitter, "mergeBuffer/used", dimFilters, 0L); + verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L); + verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L); + verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L); + verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L); + verifyMetricValue(emitter, "groupBy/mergeDictionarySize", dimFilters, 300L); } @Test @@ -181,8 +185,7 @@ public class GroupByStatsMonitorTest } } - - private void verifyTaskServiceDimensions(StubServiceEmitter emitter, String metricName, Map<String, Object> dimFilters, Number expectedValue) + private void verifyMetricValue(StubServiceEmitter emitter, String metricName, Map<String, Object> dimFilters, Number expectedValue) { final List<ServiceMetricEvent> observedMetricEvents = emitter.getMetricEvents(metricName); Assert.assertEquals(1, observedMetricEvents.size()); diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index 8f5ad497928..e9779c1ddd6 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.java.util.metrics.TaskHolder; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.jupiter.api.Timeout; @@ -72,9 +73,9 @@ public class LatchableEmitter extends StubServiceEmitter /** * Creates a {@link StubServiceEmitter} that may be used in embedded tests. */ - public LatchableEmitter(String service, String host, LatchableEmitterConfig config) + public LatchableEmitter(String service, String host, LatchableEmitterConfig config, TaskHolder taskHolder) { - super(service, host); + super(service, host, taskHolder); this.defaultWaitTimeoutMillis = config.getDefaultWaitTimeoutMillis(); } diff --git a/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java b/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java index 6188cb6304e..74ce547ed90 100644 --- a/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java +++ b/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java @@ -84,12 +84,13 @@ public class PeonTaskHolder implements TaskHolder @Override public Map<String, String> getMetricDimensions() { + final Task task = taskProvider.get(); return Map.of( - DruidMetrics.DATASOURCE, getDataSource(), - DruidMetrics.TASK_ID, getTaskId(), - DruidMetrics.ID, getTaskId(), - DruidMetrics.TASK_TYPE, getTaskType(), - DruidMetrics.GROUP_ID, getGroupId() + DruidMetrics.DATASOURCE, task.getDataSource(), + DruidMetrics.TASK_ID, task.getId(), + DruidMetrics.ID, task.getId(), + DruidMetrics.TASK_TYPE, task.getType(), + DruidMetrics.GROUP_ID, task.getGroupId() ); } } diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 24416859ea2..f79e239d586 100644 --- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -500,7 +500,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest } }; - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), TestHelper.makeJsonMapper(), @@ -571,7 +571,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest } }; - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), TestHelper.makeJsonMapper(), @@ -624,7 +624,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest Mockito.when(responseMock.getStatus()).thenReturn(0); // Test unassigned http status code case from server Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build()); - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), TestHelper.makeJsonMapper(), @@ -871,7 +871,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest } }; final Result result = new Result(proxyRequestMock, response); - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), jsonMapper, diff --git a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java index 78603090e5f..9bd3c8bc30e 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java @@ -28,6 +28,7 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.DruidNode; import org.apache.druid.server.metrics.LatchableEmitter; import org.apache.druid.server.metrics.LatchableEmitterConfig; @@ -51,9 +52,10 @@ public class LatchableEmitterModule implements DruidModule @ManageLifecycle public LatchableEmitter makeEmitter( @Self DruidNode selfNode, - LatchableEmitterConfig config + LatchableEmitterConfig config, + TaskHolder taskHolder ) { - return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), config); + return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), config, taskHolder); } } diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 894619b9ea3..6fb974a527c 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -277,7 +277,7 @@ public class SqlResourceTest extends CalciteTestBase } } }; - stubServiceEmitter = new StubServiceEmitter("test", "test"); + stubServiceEmitter = StubServiceEmitter.createStarted(); final AuthConfig authConfig = new AuthConfig(); final DefaultQueryConfig defaultQueryConfig = new DefaultQueryConfig(ImmutableMap.of()); final SqlToolbox sqlToolbox = new SqlToolbox( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
