This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2560bf0 Add new coordinator metrics for coordinator duty runtimes
(#10603)
2560bf0 is described below
commit 2560bf0a1919c36b824bd0e4f9286e2899deddd3
Author: Lucas Capistrant <[email protected]>
AuthorDate: Sun Nov 29 16:47:35 2020 -0600
Add new coordinator metrics for coordinator duty runtimes (#10603)
* Add new coordinator metrics for duty runtimes
* fix spelling for a constant variable value
* add comment clarifying why the global runtime metric is emitted where it
is
* Remove duty alias in lieu of using the class name for metrics
* fix docs
* CoordinatorStats tests + add duty stats to accumulate() logic
---
docs/operations/metrics.md | 2 +
.../java/org/apache/druid/query/DruidMetrics.java | 3 ++
.../druid/server/coordinator/CoordinatorStats.java | 51 ++++++++++++++++++
.../druid/server/coordinator/DruidCoordinator.java | 28 ++++++++--
.../duty/EmitClusterStatsAndMetrics.java | 32 +++++++++++
.../server/coordinator/CoordinatorStatsTest.java | 63 ++++++++++++++++++++++
.../server/coordinator/DruidCoordinatorTest.java | 2 +-
7 files changed, 176 insertions(+), 5 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 62c0521..68b7f88 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -254,6 +254,8 @@ These metrics are for the Druid Coordinator and are reset
each time the Coordina
|`segment/skipCompact/bytes`|Total bytes of this datasource that are skipped
(not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|`segment/skipCompact/count`|Total number of segments of this datasource that
are skipped (not eligible for auto compaction) by the auto
compaction.|datasource.|Varies.|
|`interval/skipCompact/count`|Total number of intervals of this datasource
that are skipped (not eligible for auto compaction) by the auto
compaction.|datasource.|Varies.|
+|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The
duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
+|`coordinator/global/time`|Approximate runtime of a full coordination cycle in
milliseconds. The `dutyGroup` dimension indicates what type of coordination
this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic
configuration](
../configuration/index.html#dynamic-configuration), then [log
entries](../configuration/logging.md) for class
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index b1e49af..1e31c5f 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -42,6 +42,9 @@ public class DruidMetrics
public static final String SERVER = "server";
public static final String TIER = "tier";
+ public static final String DUTY = "duty";
+ public static final String DUTY_GROUP = "dutyGroup";
+
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{
int retVal = 0;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
index fa10f20..357cf45 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
@@ -35,12 +35,14 @@ public class CoordinatorStats
{
private final Map<String, Object2LongOpenHashMap<String>> perTierStats;
private final Map<String, Object2LongOpenHashMap<String>> perDataSourceStats;
+ private final Map<String, Object2LongOpenHashMap<String>> perDutyStats;
private final Object2LongOpenHashMap<String> globalStats;
public CoordinatorStats()
{
perTierStats = new HashMap<>();
perDataSourceStats = new HashMap<>();
+ perDutyStats = new HashMap<>();
globalStats = new Object2LongOpenHashMap<>();
}
@@ -54,6 +56,11 @@ public class CoordinatorStats
return !perDataSourceStats.isEmpty();
}
+ public boolean hasPerDutyStats()
+ {
+ return !perDutyStats.isEmpty();
+ }
+
public Set<String> getTiers(final String statName)
{
final Object2LongOpenHashMap<String> theStat = perTierStats.get(statName);
@@ -72,6 +79,15 @@ public class CoordinatorStats
return Collections.unmodifiableSet(stat.keySet());
}
+ public Set<String> getDuties(String statName)
+ {
+ final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName);
+ if (stat == null) {
+ return Collections.emptySet();
+ }
+ return Collections.unmodifiableSet(stat.keySet());
+ }
+
/**
*
* @param statName the name of the statistics
@@ -109,6 +125,21 @@ public class CoordinatorStats
}
}
+ public long getDutyStat(String statName, String duty)
+ {
+ return perDutyStats.get(statName).getLong(duty);
+ }
+
+ public void forEachDutyStat(String statName, ObjLongConsumer<String>
consumer)
+ {
+ final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName);
+ if (stat != null) {
+ for (Entry<String> entry : stat.object2LongEntrySet()) {
+ consumer.accept(entry.getKey(), entry.getLongValue());
+ }
+ }
+ }
+
public long getGlobalStat(final String statName)
{
return globalStats.getLong(statName);
@@ -132,6 +163,12 @@ public class CoordinatorStats
.addTo(dataSource, value);
}
+ public void addToDutyStat(String statName, String duty, long value)
+ {
+ perDutyStats.computeIfAbsent(statName, k -> new Object2LongOpenHashMap<>())
+ .addTo(duty, value);
+ }
+
public void addToGlobalStat(final String statName, final long value)
{
globalStats.addTo(statName, value);
@@ -166,6 +203,20 @@ public class CoordinatorStats
}
);
+ stats.perDutyStats.forEach(
+ (statName, urStat) -> {
+ final Object2LongOpenHashMap<String> myStat =
perDutyStats.computeIfAbsent(
+ statName,
+ k -> new Object2LongOpenHashMap<>()
+ );
+
+ for (Entry<String> entry : urStat.object2LongEntrySet()) {
+ myStat.addTo(entry.getKey(), entry.getLongValue());
+ }
+ }
+
+ );
+
for (final Object2LongMap.Entry<String> entry :
stats.globalStats.object2LongEntrySet()) {
globalStats.addTo(entry.getKey(), entry.getLongValue());
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 678f440..2b6e8c2 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -60,8 +60,10 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CompactSegments;
@@ -93,6 +95,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -158,6 +161,10 @@ public class DruidCoordinator
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
+ private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP =
"HistoricalManagementDuties";
+ private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP =
"IndexingServiceDuties";
+ private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP =
"CompactSegmentsDuties";
+
@Inject
public DruidCoordinator(
DruidCoordinatorConfig config,
@@ -573,7 +580,7 @@ public class DruidCoordinator
public void runCompactSegmentsDuty()
{
final int startingLeaderCounter = coordLeaderSelector.localTerm();
- DutiesRunnable compactSegmentsDuty = new
DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter);
+ DutiesRunnable compactSegmentsDuty = new
DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter,
COMPACT_SEGMENTS_DUTIES_DUTY_GROUP);
compactSegmentsDuty.run();
}
@@ -598,14 +605,14 @@ public class DruidCoordinator
final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables =
new ArrayList<>();
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeHistoricalManagementDuties(),
startingLeaderCounter),
+ new DutiesRunnable(makeHistoricalManagementDuties(),
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
config.getCoordinatorPeriod()
)
);
if (indexingServiceClient != null) {
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeIndexingServiceDuties(),
startingLeaderCounter),
+ new DutiesRunnable(makeIndexingServiceDuties(),
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
config.getCoordinatorIndexingPeriod()
)
);
@@ -706,11 +713,13 @@ public class DruidCoordinator
private final long startTimeNanos = System.nanoTime();
private final List<CoordinatorDuty> duties;
private final int startingLeaderCounter;
+ private final String dutiesRunnableAlias;
- protected DutiesRunnable(List<CoordinatorDuty> duties, final int
startingLeaderCounter)
+ protected DutiesRunnable(List<CoordinatorDuty> duties, final int
startingLeaderCounter, String alias)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;
+ this.dutiesRunnableAlias = alias;
}
@VisibleForTesting
@@ -747,6 +756,7 @@ public class DruidCoordinator
public void run()
{
try {
+ final long globalStart = System.nanoTime();
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
log.info("LEGGO MY EGGO. [%s] is leader.",
coordLeaderSelector.getCurrentLeader());
@@ -801,14 +811,24 @@ public class DruidCoordinator
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
+ final long start = System.nanoTime();
params = duty.run(params);
+ final long end = System.nanoTime();
if (params == null) {
// This duty wanted to cancel the run. No log message, since the
duty should have logged a reason.
return;
+ } else {
+ params.getCoordinatorStats().addToDutyStat("runtime",
duty.getClass().getName(), TimeUnit.NANOSECONDS.toMillis(end - start));
}
}
}
+ // Emit the runtime of the full DutiesRunnable
+ params.getEmitter().emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY_GROUP, dutiesRunnableAlias)
+ .build("coordinator/global/time",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - globalStart))
+ );
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps
going.").emit();
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 70bfb04..1f6cab3 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -97,6 +97,35 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
);
}
+ private void emitDutyStat(
+ final ServiceEmitter emitter,
+ final String metricName,
+ final String duty,
+ final long value
+ )
+ {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DUTY, duty)
+ .build(metricName, value)
+ );
+ }
+
+ private void emitDutyStats(
+ final ServiceEmitter emitter,
+ final String metricName,
+ final CoordinatorStats stats,
+ final String statName
+ )
+ {
+ stats.forEachDutyStat(
+ statName,
+ (final String duty, final long count) -> {
+ emitDutyStat(emitter, metricName, duty, count);
+ }
+ );
+ }
+
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
@@ -435,6 +464,9 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
}
);
+ // Emit coordinator runtime stats
+ emitDutyStats(emitter, "coordinator/time", stats, "runtime");
+
return params;
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
index 11fcfd1..00dc094 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
@@ -116,12 +116,18 @@ public class CoordinatorStatsTest
stats.addToTieredStat("stat1", "tier1", 1);
stats.addToTieredStat("stat1", "tier2", 1);
stats.addToTieredStat("stat2", "tier1", 1);
+ stats.addToDutyStat("stat1", "duty1", 1);
+ stats.addToDutyStat("stat1", "duty2", 1);
+ stats.addToDutyStat("stat2", "duty1", 1);
final CoordinatorStats stats2 = new CoordinatorStats();
stats2.addToGlobalStat("stat1", 1);
stats2.addToTieredStat("stat1", "tier2", 1);
stats2.addToTieredStat("stat2", "tier2", 1);
stats2.addToTieredStat("stat3", "tier1", 1);
+ stats2.addToDutyStat("stat1", "duty2", 1);
+ stats2.addToDutyStat("stat2", "duty2", 1);
+ stats2.addToDutyStat("stat3", "duty1", 1);
stats.accumulate(stats2);
@@ -132,6 +138,11 @@ public class CoordinatorStatsTest
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1"));
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2"));
Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1"));
+ Assert.assertEquals(1, stats.getDutyStat("stat1", "duty1"));
+ Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2"));
+ Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1"));
+ Assert.assertEquals(1, stats.getDutyStat("stat2", "duty2"));
+ Assert.assertEquals(1, stats.getDutyStat("stat3", "duty1"));
}
@Test
@@ -167,4 +178,56 @@ public class CoordinatorStatsTest
Assert.assertEquals(10, stats.getTieredStat("stat1", "tier2"));
}
+
+ @Test(expected = NullPointerException.class)
+ public void testGetNonexistentDutyStat()
+ {
+ stats.getDutyStat("stat", "duty");
+ }
+
+ @Test
+ public void testAddToDutyStat()
+ {
+ Assert.assertFalse(stats.hasPerDutyStats());
+ stats.addToDutyStat("stat1", "duty1", 1);
+ stats.addToDutyStat("stat1", "duty2", 1);
+ stats.addToDutyStat("stat1", "duty1", -5);
+ stats.addToDutyStat("stat2", "duty1", 1);
+ stats.addToDutyStat("stat1", "duty2", 1);
+ Assert.assertTrue(stats.hasPerDutyStats());
+
+ Assert.assertEquals(
+ Sets.newHashSet("duty1", "duty2"),
+ stats.getDuties("stat1")
+ );
+ Assert.assertEquals(
+ Sets.newHashSet("duty1"),
+ stats.getDuties("stat2")
+ );
+ Assert.assertTrue(stats.getDuties("stat3").isEmpty());
+
+ Assert.assertEquals(-4, stats.getDutyStat("stat1", "duty1"));
+ Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2"));
+ Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1"));
+ }
+
+ @Test
+ public void testForEachDutyStat()
+ {
+ final Map<String, Long> expected = ImmutableMap.of(
+ "duty1", 1L,
+ "duty2", 2L,
+ "duty3", 3L
+ );
+ final Map<String, Long> actual = new HashMap<>();
+
+ expected.forEach(
+ (duty, count) -> stats.addToDutyStat("stat", duty, count)
+ );
+
+ stats.forEachDutyStat("stat0", (duty, count) -> Assert.fail());
+ stats.forEachDutyStat("stat", actual::put);
+
+ Assert.assertEquals(expected, actual);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 34fe944..31bfeec 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -709,7 +709,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
ZkEnablementConfig.ENABLED
);
- DruidCoordinator.DutiesRunnable duty = c.new
DutiesRunnable(Collections.emptyList(), 0);
+ DruidCoordinator.DutiesRunnable duty = c.new
DutiesRunnable(Collections.emptyList(), 0, "TEST");
// before initialization
Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
Assert.assertNull(c.getBalancerExec());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]