This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ea51d8a Duties in Indexing group (such as Auto Compaction) does not
report metrics (#12352)
ea51d8a is described below
commit ea51d8a16c77dbb4074f5f665bbefb1cdc9e3406
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Mar 23 18:18:28 2022 -0700
Duties in Indexing group (such as Auto Compaction) does not report metrics
(#12352)
* add impl
* add unit tests
* fix checkstyle
* address comments
* fix checkstyle
---
.../druid/server/coordinator/DruidCoordinator.java | 23 +++-
.../duty/EmitClusterStatsAndMetrics.java | 107 ++++++++++++------
.../server/coordinator/DruidCoordinatorTest.java | 30 ++++++
.../duty/EmitClusterStatsAndMetricsTest.java | 119 +++++++++++++++++++++
4 files changed, 244 insertions(+), 35 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 8c060ee..6b1e29d 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -169,7 +169,7 @@ public class DruidCoordinator
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
- private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP =
"HistoricalManagementDuties";
+ public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP =
"HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP =
"MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP =
"IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP =
"CompactSegmentsDuties";
@@ -765,8 +765,7 @@ public class DruidCoordinator
new RunRules(DruidCoordinator.this),
new UnloadUnusedSegments(),
new MarkAsUnusedOvershadowedSegments(DruidCoordinator.this),
- new BalanceSegments(DruidCoordinator.this),
- new EmitClusterStatsAndMetrics(DruidCoordinator.this)
+ new BalanceSegments(DruidCoordinator.this)
);
}
@@ -841,7 +840,17 @@ public class DruidCoordinator
protected DutiesRunnable(List<? extends CoordinatorDuty> duties, final int
startingLeaderCounter, String alias)
{
- this.duties = duties;
+ // Automatically add EmitClusterStatsAndMetrics duty to the group if it
does not already exists
+ // This is to avoid human coding error (forgetting to add the
EmitClusterStatsAndMetrics duty to the group)
+ // causing metrics from the duties to not being emitted.
+ if (duties.stream().noneMatch(duty -> duty instanceof
EmitClusterStatsAndMetrics)) {
+ boolean isContainCompactSegmentDuty = duties.stream().anyMatch(duty ->
duty instanceof CompactSegments);
+ List<CoordinatorDuty> allDuties = new ArrayList<>(duties);
+ allDuties.add(new EmitClusterStatsAndMetrics(DruidCoordinator.this,
alias, isContainCompactSegmentDuty));
+ this.duties = allDuties;
+ } else {
+ this.duties = duties;
+ }
this.startingLeaderCounter = startingLeaderCounter;
this.dutiesRunnableAlias = alias;
}
@@ -958,6 +967,12 @@ public class DruidCoordinator
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps
going.").emit();
}
}
+
+ @VisibleForTesting
+ public List<? extends CoordinatorDuty> getDuties()
+ {
+ return duties;
+ }
}
/**
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 1f6cab3..71a18ce 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -48,10 +48,14 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor";
private final DruidCoordinator coordinator;
+ private final String groupName;
+ private final boolean isContainCompactSegmentDuty;
- public EmitClusterStatsAndMetrics(DruidCoordinator coordinator)
+ public EmitClusterStatsAndMetrics(DruidCoordinator coordinator, String
groupName, boolean isContainCompactSegmentDuty)
{
this.coordinator = coordinator;
+ this.groupName = groupName;
+ this.isContainCompactSegmentDuty = isContainCompactSegmentDuty;
}
private void emitTieredStat(
@@ -64,6 +68,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
@@ -78,6 +83,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
@@ -107,6 +113,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY, duty)
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
@@ -133,6 +140,21 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
CoordinatorStats stats = params.getCoordinatorStats();
ServiceEmitter emitter = params.getEmitter();
+ if
(DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP.equals(groupName)) {
+ emitStatsForHistoricalManagementDuties(cluster, stats, emitter, params);
+ }
+ if (isContainCompactSegmentDuty) {
+ emitStatsForCompactSegments(cluster, stats, emitter);
+ }
+
+ // Emit coordinator runtime stats
+ emitDutyStats(emitter, "coordinator/time", stats, "runtime");
+
+ return params;
+ }
+
+ private void emitStatsForHistoricalManagementDuties(DruidCluster cluster,
CoordinatorStats stats, ServiceEmitter emitter, DruidCoordinatorRuntimeParams
params)
+ {
stats.forEachTieredStat(
"assignedCount",
(final String tier, final long count) -> {
@@ -190,7 +212,9 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
);
emitter.emit(
- new ServiceMetricEvent.Builder().build(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+ .build(
"segment/overShadowed/count",
stats.getGlobalStat("overShadowedCount")
)
@@ -269,24 +293,28 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
.forEach((final String serverName, final LoadQueuePeon queuePeon) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/failed",
queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/count",
queuePeon.getSegmentsToLoad().size()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/dropQueue/count",
queuePeon.getSegmentsToDrop().size()
)
@@ -299,6 +327,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
final int numUnavailableUsedSegmentsInDataSource =
entry.getIntValue();
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/unavailable/count",
numUnavailableUsedSegmentsInDataSource
)
@@ -314,6 +343,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/underReplicated/count", underReplicationCount
@@ -323,22 +353,54 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
}
);
+ // Emit segment metrics
+ params.getUsedSegmentsTimelinesPerDataSource().forEach(
+ (String dataSource, VersionedIntervalTimeline<String, DataSegment>
dataSourceWithUsedSegments) -> {
+ long totalSizeOfUsedSegments = dataSourceWithUsedSegments
+ .iterateAllObjects()
+ .stream()
+ .mapToLong(DataSegment::getSize)
+ .sum();
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/size", totalSizeOfUsedSegments)
+ );
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/count",
dataSourceWithUsedSegments.getNumObjects())
+ );
+ }
+ );
+ }
+
+ private void emitStatsForCompactSegments(DruidCluster cluster,
CoordinatorStats stats, ServiceEmitter emitter)
+ {
emitter.emit(
- new ServiceMetricEvent.Builder().build(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+ .build(
"compact/task/count",
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
)
);
emitter.emit(
- new ServiceMetricEvent.Builder().build(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+ .build(
"compactTask/maxSlot/count",
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
)
);
emitter.emit(
- new ServiceMetricEvent.Builder().build(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+ .build(
"compactTask/availableSlot/count",
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
)
@@ -349,6 +411,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/bytes", count)
);
@@ -360,6 +423,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/count", count)
);
@@ -371,6 +435,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/waitCompact/count", count)
);
@@ -382,6 +447,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/bytes", count)
);
@@ -393,6 +459,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/count", count)
);
@@ -404,6 +471,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/skipCompact/count", count)
);
@@ -415,6 +483,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/bytes", count)
);
@@ -426,6 +495,7 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/count", count)
);
@@ -437,36 +507,11 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/compacted/count", count)
);
}
);
-
- // Emit segment metrics
- params.getUsedSegmentsTimelinesPerDataSource().forEach(
- (String dataSource, VersionedIntervalTimeline<String, DataSegment>
dataSourceWithUsedSegments) -> {
- long totalSizeOfUsedSegments = dataSourceWithUsedSegments
- .iterateAllObjects()
- .stream()
- .mapToLong(DataSegment::getSize)
- .sum();
- emitter.emit(
- new ServiceMetricEvent.Builder()
- .setDimension(DruidMetrics.DATASOURCE, dataSource)
- .build("segment/size", totalSizeOfUsedSegments)
- );
- emitter.emit(
- new ServiceMetricEvent.Builder()
- .setDimension(DruidMetrics.DATASOURCE, dataSource)
- .build("segment/count",
dataSourceWithUsedSegments.getNumObjects())
- );
- }
- );
-
- // Emit coordinator runtime stats
- emitDutyStats(emitter, "coordinator/time", stats, "runtime");
-
- return params;
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 0363748..7a12a7a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -57,7 +57,9 @@ import
org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
+import org.apache.druid.server.coordinator.duty.LogUsedSegments;
import
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@@ -1133,6 +1135,34 @@ public class DruidCoordinatorTest extends CuratorTestBase
latch2.await();
}
+ @Test
+ public void testEmitClusterStatsAndMetricsAddedWhenNotInDutyList()
+ {
+ DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new
DutiesRunnable(ImmutableList.of(new LogUsedSegments()), 0, "TEST");
+ List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
+ int emitDutyFound = 0;
+ for (CoordinatorDuty duty : duties) {
+ if (duty instanceof EmitClusterStatsAndMetrics) {
+ emitDutyFound++;
+ }
+ }
+ Assert.assertEquals(1, emitDutyFound);
+ }
+
+ @Test
+ public void testEmitClusterStatsAndMetricsNotAddedAgainWhenInDutyList()
+ {
+ DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new
DutiesRunnable(ImmutableList.of(new LogUsedSegments(), new
EmitClusterStatsAndMetrics(coordinator, "TEST", false)), 0, "TEST");
+ List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
+ int emitDutyFound = 0;
+ for (CoordinatorDuty duty : duties) {
+ if (duty instanceof EmitClusterStatsAndMetrics) {
+ emitDutyFound++;
+ }
+ }
+ Assert.assertEquals(1, emitDutyFound);
+ }
+
private CountDownLatch
createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java
new file mode 100644
index 0000000..00061b1
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMaps;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.server.coordinator.CoordinatorStats;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EmitClusterStatsAndMetricsTest
+{
+ @Mock
+ private ServiceEmitter mockServiceEmitter;
+ @Mock
+ private DruidCoordinator mockDruidCoordinator;
+ @Mock
+ private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
+ @Mock
+ CoordinatorStats mockCoordinatorStats;
+ @Mock
+ DruidCluster mockDruidCluster;
+ @Mock
+ MetadataRuleManager mockMetadataRuleManager;
+
+ @Test
+ public void testRunOnlyEmitStatsForHistoricalDuties()
+ {
+ ArgumentCaptor<ServiceEventBuilder> argumentCaptor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(mockCoordinatorStats);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getDruidCluster()).thenReturn(mockDruidCluster);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getDatabaseRuleManager()).thenReturn(mockMetadataRuleManager);
+
Mockito.when(mockDruidCoordinator.computeNumsUnavailableUsedSegmentsPerDataSource()).thenReturn(Object2IntMaps.emptyMap());
+
Mockito.when(mockDruidCoordinator.computeUnderReplicationCountsPerDataSourcePerTier()).thenReturn(ImmutableMap.of());
+ CoordinatorDuty duty = new
EmitClusterStatsAndMetrics(mockDruidCoordinator,
DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, false);
+ duty.run(mockDruidCoordinatorRuntimeParams);
+ Mockito.verify(mockServiceEmitter,
Mockito.atLeastOnce()).emit(argumentCaptor.capture());
+ List<ServiceEventBuilder> emittedEvents = argumentCaptor.getAllValues();
+ boolean foundCompactMetric = false;
+ boolean foundHistoricalDutyMetric = false;
+ for (ServiceEventBuilder eventBuilder : emittedEvents) {
+ ServiceMetricEvent serviceMetricEvent = ((ServiceMetricEvent)
eventBuilder.build("x", "x"));
+ String metric = serviceMetricEvent.getMetric();
+ if ("segment/overShadowed/count".equals(metric)) {
+ foundHistoricalDutyMetric = true;
+ } else if ("compact/task/count".equals(metric)) {
+ foundCompactMetric = true;
+ }
+ String dutyGroup = (String)
serviceMetricEvent.getUserDims().get("dutyGroup");
+ Assert.assertNotNull(dutyGroup);
+
Assert.assertEquals(DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP,
dutyGroup);
+ }
+ Assert.assertTrue(foundHistoricalDutyMetric);
+ Assert.assertFalse(foundCompactMetric);
+ }
+
+ @Test
+ public void testRunEmitStatsForCompactionWhenHaveCompactSegmentDuty()
+ {
+ String groupName = "blah";
+ ArgumentCaptor<ServiceEventBuilder> argumentCaptor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(mockCoordinatorStats);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getDruidCluster()).thenReturn(mockDruidCluster);
+ CoordinatorDuty duty = new
EmitClusterStatsAndMetrics(mockDruidCoordinator, groupName, true);
+ duty.run(mockDruidCoordinatorRuntimeParams);
+ Mockito.verify(mockServiceEmitter,
Mockito.atLeastOnce()).emit(argumentCaptor.capture());
+ List<ServiceEventBuilder> emittedEvents = argumentCaptor.getAllValues();
+ boolean foundCompactMetric = false;
+ boolean foundHistoricalDutyMetric = false;
+ for (ServiceEventBuilder eventBuilder : emittedEvents) {
+ ServiceMetricEvent serviceMetricEvent = ((ServiceMetricEvent)
eventBuilder.build("x", "x"));
+ String metric = serviceMetricEvent.getMetric();
+ if ("segment/overShadowed/count".equals(metric)) {
+ foundHistoricalDutyMetric = true;
+ } else if ("compact/task/count".equals(metric)) {
+ foundCompactMetric = true;
+ }
+ String dutyGroup = (String)
serviceMetricEvent.getUserDims().get("dutyGroup");
+ Assert.assertNotNull(dutyGroup);
+ Assert.assertEquals(groupName, dutyGroup);
+ }
+ Assert.assertFalse(foundHistoricalDutyMetric);
+ Assert.assertTrue(foundCompactMetric);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]