kfaraz commented on code in PR #18032:
URL: https://github.com/apache/druid/pull/18032#discussion_r2106380024
##########
server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java:
##########
@@ -61,12 +62,14 @@ public boolean doMonitor(ServiceEmitter emitter)
return true;
}
- private void emit(ServiceEmitter emitter, String key, Map<String, Long>
counts)
+ private void emit(ServiceEmitter emitter, String key, Map<RowKey, Long>
counts)
{
final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
if (counts != null) {
counts.forEach((k, v) -> {
- builder.setDimension("dataSource", k);
+ for (final Map.Entry<Dimension, String> entry :
k.getValues().entrySet()) {
+ builder.setDimension(entry.getKey().reportedName(),
entry.getValue());
+ }
Review Comment:
Nit: short-hand (since we are already using forEach in this code block)
```suggestion
k.getValues().forEach((dim, value) ->
builder.setDimension(dim.reportedName(), value));
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -904,58 +906,56 @@ private Map<String, Long> getDeltaValues(Map<String,
Long> total, Map<String, Lo
return deltaValues;
}
- public Map<String, Long> getSuccessfulTaskCount()
+ public Map<RowKey, Long> getSuccessfulTaskCount()
{
- Map<String, Long> total =
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
+ final Map<RowKey, Long> total =
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
synchronized (totalSuccessfulTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalSuccessfulTaskCount);
+ Map<RowKey, Long> delta = getDeltaValues(total,
prevTotalSuccessfulTaskCount);
prevTotalSuccessfulTaskCount = total;
return delta;
}
}
- public Map<String, Long> getFailedTaskCount()
+ public Map<RowKey, Long> getFailedTaskCount()
{
- Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount,
AtomicLong::get);
+ final Map<RowKey, Long> total =
CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
synchronized (totalFailedTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalFailedTaskCount);
+ Map<RowKey, Long> delta = getDeltaValues(total,
prevTotalFailedTaskCount);
prevTotalFailedTaskCount = total;
return delta;
}
}
- private Map<String, String> getCurrentTaskDatasources()
+ private Map<String, RowKey> getCurrentTaskDatasources()
{
- return activeTasks.values().stream()
- .map(entry -> entry.task)
- .collect(Collectors.toMap(Task::getId,
Task::getDataSource));
+ return activeTasks.values().stream().filter(entry ->
!entry.isComplete).map(entry ->
entry.task).collect(Collectors.toMap(Task::getId, TaskQueue::getMetricKey));
Review Comment:
It was cleaner to have this broken over multiple lines.
##########
server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java:
##########
@@ -25,6 +25,7 @@
public enum Dimension
{
TIER("tier"),
+ TASK_TYPE("taskType"),
Review Comment:
Yes, that is the eventual goal. It is a part of making `CoordinatorRunStats`
available to the whole of Druid.
But I don't want to rush it until I have had a chance to think some more
about it.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1136,4 +1136,16 @@ static class TaskEntry
this.lastUpdatedTime = DateTimes.nowUtc();
}
}
+
+ private static RowKey getMetricKey(final Task task)
+ {
+ if (task == null) {
+ return RowKey.empty();
+ }
+ RowKey.Builder builder = new RowKey.Builder();
+ builder.with(Dimension.DATASOURCE, task.getDataSource());
+ builder.with(Dimension.TASK_TYPE, task.getType());
+
+ return builder.build();
Review Comment:
You can use the short hand:
```suggestion
return RowKey.with(Dimension.DATASOURCE, task.getDataSource())
.and(Dimension.TASK_TYPE, task.getType());
```
##########
server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java:
##########
@@ -85,12 +93,17 @@ public void testMonitor()
final TaskCountStatsMonitor monitor = new
TaskCountStatsMonitor(statsProvider);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(7, emitter.getEvents().size());
- emitter.verifyValue("task/success/count", 1L);
- emitter.verifyValue("task/failed/count", 1L);
- emitter.verifyValue("task/running/count", 1L);
- emitter.verifyValue("task/pending/count", 1L);
- emitter.verifyValue("task/waiting/count", 1L);
+
+ Assert.assertEquals(9, emitter.getEvents().size());
+
+ emitter.verifyValue("task/success/count", ImmutableMap.of("dataSource",
"d1", "taskType", "index"), 1L);
Review Comment:
for brevity:
```suggestion
emitter.verifyValue("task/success/count", Map.of("dataSource", "d1",
"taskType", "index"), 1L);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]