This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ea51d8a  Duties in Indexing group (such as Auto Compaction) does not 
report metrics (#12352)
ea51d8a is described below

commit ea51d8a16c77dbb4074f5f665bbefb1cdc9e3406
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Mar 23 18:18:28 2022 -0700

    Duties in Indexing group (such as Auto Compaction) does not report metrics 
(#12352)
    
    * add impl
    
    * add unit tests
    
    * fix checkstyle
    
    * address comments
    
    * fix checkstyle
---
 .../druid/server/coordinator/DruidCoordinator.java |  23 +++-
 .../duty/EmitClusterStatsAndMetrics.java           | 107 ++++++++++++------
 .../server/coordinator/DruidCoordinatorTest.java   |  30 ++++++
 .../duty/EmitClusterStatsAndMetricsTest.java       | 119 +++++++++++++++++++++
 4 files changed, 244 insertions(+), 35 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 8c060ee..6b1e29d 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -169,7 +169,7 @@ public class DruidCoordinator
   private int cachedBalancerThreadNumber;
   private ListeningExecutorService balancerExec;
 
-  private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = 
"HistoricalManagementDuties";
+  public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = 
"HistoricalManagementDuties";
   private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = 
"MetadataStoreManagementDuties";
   private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = 
"IndexingServiceDuties";
   private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = 
"CompactSegmentsDuties";
@@ -765,8 +765,7 @@ public class DruidCoordinator
         new RunRules(DruidCoordinator.this),
         new UnloadUnusedSegments(),
         new MarkAsUnusedOvershadowedSegments(DruidCoordinator.this),
-        new BalanceSegments(DruidCoordinator.this),
-        new EmitClusterStatsAndMetrics(DruidCoordinator.this)
+        new BalanceSegments(DruidCoordinator.this)
     );
   }
 
@@ -841,7 +840,17 @@ public class DruidCoordinator
 
     protected DutiesRunnable(List<? extends CoordinatorDuty> duties, final int 
startingLeaderCounter, String alias)
     {
-      this.duties = duties;
+      // Automatically add EmitClusterStatsAndMetrics duty to the group if it 
does not already exists
+      // This is to avoid human coding error (forgetting to add the 
EmitClusterStatsAndMetrics duty to the group)
+      // causing metrics from the duties to not being emitted.
+      if (duties.stream().noneMatch(duty -> duty instanceof 
EmitClusterStatsAndMetrics)) {
+        boolean isContainCompactSegmentDuty = duties.stream().anyMatch(duty -> 
duty instanceof CompactSegments);
+        List<CoordinatorDuty> allDuties = new ArrayList<>(duties);
+        allDuties.add(new EmitClusterStatsAndMetrics(DruidCoordinator.this, 
alias, isContainCompactSegmentDuty));
+        this.duties = allDuties;
+      } else {
+        this.duties = duties;
+      }
       this.startingLeaderCounter = startingLeaderCounter;
       this.dutiesRunnableAlias = alias;
     }
@@ -958,6 +967,12 @@ public class DruidCoordinator
         log.makeAlert(e, "Caught exception, ignoring so that schedule keeps 
going.").emit();
       }
     }
+
+    @VisibleForTesting
+    public List<? extends CoordinatorDuty> getDuties()
+    {
+      return duties;
+    }
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 1f6cab3..71a18ce 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -48,10 +48,14 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
   public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor";
 
   private final DruidCoordinator coordinator;
+  private final String groupName;
+  private final boolean isContainCompactSegmentDuty;
 
-  public EmitClusterStatsAndMetrics(DruidCoordinator coordinator)
+  public EmitClusterStatsAndMetrics(DruidCoordinator coordinator, String 
groupName, boolean isContainCompactSegmentDuty)
   {
     this.coordinator = coordinator;
+    this.groupName = groupName;
+    this.isContainCompactSegmentDuty = isContainCompactSegmentDuty;
   }
 
   private void emitTieredStat(
@@ -64,6 +68,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
     emitter.emit(
         new ServiceMetricEvent.Builder()
             .setDimension(DruidMetrics.TIER, tier)
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
             .build(metricName, value)
     );
   }
@@ -78,6 +83,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
     emitter.emit(
         new ServiceMetricEvent.Builder()
             .setDimension(DruidMetrics.TIER, tier)
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
             .build(metricName, value)
     );
   }
@@ -107,6 +113,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
     emitter.emit(
         new ServiceMetricEvent.Builder()
             .setDimension(DruidMetrics.DUTY, duty)
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
             .build(metricName, value)
     );
   }
@@ -133,6 +140,21 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
     CoordinatorStats stats = params.getCoordinatorStats();
     ServiceEmitter emitter = params.getEmitter();
 
+    if 
(DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP.equals(groupName)) {
+      emitStatsForHistoricalManagementDuties(cluster, stats, emitter, params);
+    }
+    if (isContainCompactSegmentDuty) {
+      emitStatsForCompactSegments(cluster, stats, emitter);
+    }
+
+    // Emit coordinator runtime stats
+    emitDutyStats(emitter, "coordinator/time", stats, "runtime");
+
+    return params;
+  }
+
+  private void emitStatsForHistoricalManagementDuties(DruidCluster cluster, 
CoordinatorStats stats, ServiceEmitter emitter, DruidCoordinatorRuntimeParams 
params)
+  {
     stats.forEachTieredStat(
         "assignedCount",
         (final String tier, final long count) -> {
@@ -190,7 +212,9 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
     );
 
     emitter.emit(
-        new ServiceMetricEvent.Builder().build(
+        new ServiceMetricEvent.Builder()
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+            .build(
             "segment/overShadowed/count",
             stats.getGlobalStat("overShadowedCount")
         )
@@ -269,24 +293,28 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         .forEach((final String serverName, final LoadQueuePeon queuePeon) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.SERVER, serverName).build(
                   "segment/loadQueue/size", queuePeon.getLoadQueueSize()
               )
           );
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.SERVER, serverName).build(
                   "segment/loadQueue/failed", 
queuePeon.getAndResetFailedAssignCount()
               )
           );
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.SERVER, serverName).build(
                   "segment/loadQueue/count", 
queuePeon.getSegmentsToLoad().size()
               )
           );
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.SERVER, serverName).build(
                   "segment/dropQueue/count", 
queuePeon.getSegmentsToDrop().size()
               )
@@ -299,6 +327,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
           final int numUnavailableUsedSegmentsInDataSource = 
entry.getIntValue();
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource).build(
                   "segment/unavailable/count", 
numUnavailableUsedSegmentsInDataSource
               )
@@ -314,6 +343,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
 
             emitter.emit(
                 new ServiceMetricEvent.Builder()
+                    .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                     .setDimension(DruidMetrics.TIER, tier)
                     .setDimension(DruidMetrics.DATASOURCE, dataSource).build(
                     "segment/underReplicated/count", underReplicationCount
@@ -323,22 +353,54 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         }
     );
 
+    // Emit segment metrics
+    params.getUsedSegmentsTimelinesPerDataSource().forEach(
+        (String dataSource, VersionedIntervalTimeline<String, DataSegment> 
dataSourceWithUsedSegments) -> {
+          long totalSizeOfUsedSegments = dataSourceWithUsedSegments
+              .iterateAllObjects()
+              .stream()
+              .mapToLong(DataSegment::getSize)
+              .sum();
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/size", totalSizeOfUsedSegments)
+          );
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/count", 
dataSourceWithUsedSegments.getNumObjects())
+          );
+        }
+    );
+  }
+
+  private void emitStatsForCompactSegments(DruidCluster cluster, 
CoordinatorStats stats, ServiceEmitter emitter)
+  {
     emitter.emit(
-        new ServiceMetricEvent.Builder().build(
+        new ServiceMetricEvent.Builder()
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+            .build(
             "compact/task/count",
             stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
         )
     );
 
     emitter.emit(
-        new ServiceMetricEvent.Builder().build(
+        new ServiceMetricEvent.Builder()
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+            .build(
             "compactTask/maxSlot/count",
             stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
         )
     );
 
     emitter.emit(
-        new ServiceMetricEvent.Builder().build(
+        new ServiceMetricEvent.Builder()
+            .setDimension(DruidMetrics.DUTY_GROUP, groupName)
+            .build(
             "compactTask/availableSlot/count",
             stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
         )
@@ -349,6 +411,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("segment/waitCompact/bytes", count)
           );
@@ -360,6 +423,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("segment/waitCompact/count", count)
           );
@@ -371,6 +435,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("interval/waitCompact/count", count)
           );
@@ -382,6 +447,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("segment/skipCompact/bytes", count)
           );
@@ -393,6 +459,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("segment/skipCompact/count", count)
           );
@@ -404,6 +471,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("interval/skipCompact/count", count)
           );
@@ -415,6 +483,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("segment/compacted/bytes", count)
           );
@@ -426,6 +495,7 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("segment/compacted/count", count)
           );
@@ -437,36 +507,11 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DUTY_GROUP, groupName)
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
                   .build("interval/compacted/count", count)
           );
         }
     );
-
-    // Emit segment metrics
-    params.getUsedSegmentsTimelinesPerDataSource().forEach(
-        (String dataSource, VersionedIntervalTimeline<String, DataSegment> 
dataSourceWithUsedSegments) -> {
-          long totalSizeOfUsedSegments = dataSourceWithUsedSegments
-              .iterateAllObjects()
-              .stream()
-              .mapToLong(DataSegment::getSize)
-              .sum();
-          emitter.emit(
-              new ServiceMetricEvent.Builder()
-                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
-                  .build("segment/size", totalSizeOfUsedSegments)
-          );
-          emitter.emit(
-              new ServiceMetricEvent.Builder()
-                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
-                  .build("segment/count", 
dataSourceWithUsedSegments.getNumObjects())
-          );
-        }
-    );
-
-    // Emit coordinator runtime stats
-    emitDutyStats(emitter, "coordinator/time", stats, "runtime");
-
-    return params;
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 0363748..7a12a7a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -57,7 +57,9 @@ import 
org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
 import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
+import org.apache.druid.server.coordinator.duty.LogUsedSegments;
 import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@@ -1133,6 +1135,34 @@ public class DruidCoordinatorTest extends CuratorTestBase
     latch2.await();
   }
 
+  @Test
+  public void testEmitClusterStatsAndMetricsAddedWhenNotInDutyList()
+  {
+    DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new 
DutiesRunnable(ImmutableList.of(new LogUsedSegments()), 0, "TEST");
+    List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
+    int emitDutyFound = 0;
+    for (CoordinatorDuty duty : duties) {
+      if (duty instanceof EmitClusterStatsAndMetrics) {
+        emitDutyFound++;
+      }
+    }
+    Assert.assertEquals(1, emitDutyFound);
+  }
+
+  @Test
+  public void testEmitClusterStatsAndMetricsNotAddedAgainWhenInDutyList()
+  {
+    DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new 
DutiesRunnable(ImmutableList.of(new LogUsedSegments(), new 
EmitClusterStatsAndMetrics(coordinator, "TEST", false)), 0, "TEST");
+    List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
+    int emitDutyFound = 0;
+    for (CoordinatorDuty duty : duties) {
+      if (duty instanceof EmitClusterStatsAndMetrics) {
+        emitDutyFound++;
+      }
+    }
+    Assert.assertEquals(1, emitDutyFound);
+  }
+
   private CountDownLatch 
createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
                                                                                
       PathChildrenCache pathChildrenCache,
                                                                                
       Map<String, DataSegment> segments,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java
new file mode 100644
index 0000000..00061b1
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMaps;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.server.coordinator.CoordinatorStats;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EmitClusterStatsAndMetricsTest
+{
+  @Mock
+  private ServiceEmitter mockServiceEmitter;
+  @Mock
+  private DruidCoordinator mockDruidCoordinator;
+  @Mock
+  private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
+  @Mock
+  CoordinatorStats mockCoordinatorStats;
+  @Mock
+  DruidCluster mockDruidCluster;
+  @Mock
+  MetadataRuleManager mockMetadataRuleManager;
+
+  @Test
+  public void testRunOnlyEmitStatsForHistoricalDuties()
+  {
+    ArgumentCaptor<ServiceEventBuilder> argumentCaptor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(mockCoordinatorStats);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getDruidCluster()).thenReturn(mockDruidCluster);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getDatabaseRuleManager()).thenReturn(mockMetadataRuleManager);
+    
Mockito.when(mockDruidCoordinator.computeNumsUnavailableUsedSegmentsPerDataSource()).thenReturn(Object2IntMaps.emptyMap());
+    
Mockito.when(mockDruidCoordinator.computeUnderReplicationCountsPerDataSourcePerTier()).thenReturn(ImmutableMap.of());
+    CoordinatorDuty duty = new 
EmitClusterStatsAndMetrics(mockDruidCoordinator, 
DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, false);
+    duty.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verify(mockServiceEmitter, 
Mockito.atLeastOnce()).emit(argumentCaptor.capture());
+    List<ServiceEventBuilder> emittedEvents = argumentCaptor.getAllValues();
+    boolean foundCompactMetric = false;
+    boolean foundHistoricalDutyMetric = false;
+    for (ServiceEventBuilder eventBuilder : emittedEvents) {
+      ServiceMetricEvent serviceMetricEvent = ((ServiceMetricEvent) 
eventBuilder.build("x", "x"));
+      String metric = serviceMetricEvent.getMetric();
+      if ("segment/overShadowed/count".equals(metric)) {
+        foundHistoricalDutyMetric = true;
+      } else if ("compact/task/count".equals(metric)) {
+        foundCompactMetric = true;
+      }
+      String dutyGroup = (String) 
serviceMetricEvent.getUserDims().get("dutyGroup");
+      Assert.assertNotNull(dutyGroup);
+      
Assert.assertEquals(DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, 
dutyGroup);
+    }
+    Assert.assertTrue(foundHistoricalDutyMetric);
+    Assert.assertFalse(foundCompactMetric);
+  }
+
+  @Test
+  public void testRunEmitStatsForCompactionWhenHaveCompactSegmentDuty()
+  {
+    String groupName = "blah";
+    ArgumentCaptor<ServiceEventBuilder> argumentCaptor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(mockCoordinatorStats);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getDruidCluster()).thenReturn(mockDruidCluster);
+    CoordinatorDuty duty = new 
EmitClusterStatsAndMetrics(mockDruidCoordinator, groupName, true);
+    duty.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verify(mockServiceEmitter, 
Mockito.atLeastOnce()).emit(argumentCaptor.capture());
+    List<ServiceEventBuilder> emittedEvents = argumentCaptor.getAllValues();
+    boolean foundCompactMetric = false;
+    boolean foundHistoricalDutyMetric = false;
+    for (ServiceEventBuilder eventBuilder : emittedEvents) {
+      ServiceMetricEvent serviceMetricEvent = ((ServiceMetricEvent) 
eventBuilder.build("x", "x"));
+      String metric = serviceMetricEvent.getMetric();
+      if ("segment/overShadowed/count".equals(metric)) {
+        foundHistoricalDutyMetric = true;
+      } else if ("compact/task/count".equals(metric)) {
+        foundCompactMetric = true;
+      }
+      String dutyGroup = (String) 
serviceMetricEvent.getUserDims().get("dutyGroup");
+      Assert.assertNotNull(dutyGroup);
+      Assert.assertEquals(groupName, dutyGroup);
+    }
+    Assert.assertFalse(foundHistoricalDutyMetric);
+    Assert.assertTrue(foundCompactMetric);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to