This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 64af9bfe5b Add groupId to metrics (#14402)
64af9bfe5b is described below
commit 64af9bfe5bfcbe81e865c7c2bb3b898297b1d76c
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Jun 16 12:28:16 2023 -0400
Add groupId to metrics (#14402)
* Add group id as a dimension
* Revert changes
* Add to forking task runner
* Add missing metrics
* Fix indenting
* revert metrics
* Fix indentation
---
docs/operations/metrics.md | 60 +++++++++++-----------
.../common/TaskRealtimeMetricsMonitorBuilder.java | 3 +-
.../common/task/AbstractBatchIndexTask.java | 1 +
.../druid/indexing/common/task/IndexTaskUtils.java | 2 +
.../druid/indexing/overlord/ForkingTaskRunner.java | 4 ++
.../indexing/common/task/IndexTaskUtilsTest.java | 34 ++++++++++++
.../java/org/apache/druid/query/DruidMetrics.java | 1 +
7 files changed, 74 insertions(+), 31 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index cb786abb82..16aaca9083 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -154,9 +154,9 @@ If SQL is enabled, the Broker will emit the following
metrics for SQL.
|Metric|Description| Dimensions
|Normal Value|
|------|-----------|---------------------------------------------------------|------------|
-|`ingest/count`|Count of `1` every time an ingestion job runs (includes
compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`,
`taskType`, `taskIngestionMode`, `tags` |Always `1`.|
-|`ingest/segments/count`|Count of final segments created by job (includes
tombstones). | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags`
|At least `1`.|
-|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`,
`taskId`, `taskType`, `taskIngestionMode`, `tags` |Zero or more for replace.
Always zero for non-replace tasks (always zero for legacy replace, see below).|
+|`ingest/count`|Count of `1` every time an ingestion job runs (includes
compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`,
`taskType`, `groupId`, `taskIngestionMode`, `tags` |Always `1`.|
+|`ingest/segments/count`|Count of final segments created by job (includes
tombstones). | `dataSource`, `taskId`, `taskType`, `groupId`,
`taskIngestionMode`, `tags` |At least `1`.|
+|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`,
`taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Zero or more for
replace. Always zero for non-replace tasks (always zero for legacy replace, see
below).|
The `taskIngestionMode` dimension includes the following modes:
* `APPEND`: a native ingestion job appending to existing segments
@@ -206,26 +206,26 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
-|`ingest/events/thrownAway`|Number of events rejected because they are either
null, or filtered by the transform spec, or outside the
windowPeriod.|`dataSource`, `taskId`, `taskType`, `tags`|0|
-|`ingest/events/unparseable`|Number of events rejected because the events are
unparseable.|`dataSource`, `taskId`, `taskType`, `tags`|0|
-|`ingest/events/duplicate`|Number of events rejected because the events are
duplicated.|`dataSource`, `taskId`, `taskType`, `tags`|0|
-|`ingest/events/processed`|Number of events successfully processed per
emission period.|`dataSource`, `taskId`, `taskType`, `tags`|Equal to the number
of events per emission period.|
-|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`,
`taskType`|Your number of events with rollup.|
-|`ingest/persists/count`|Number of times persist occurred.|`dataSource`,
`taskId`, `taskType`, `tags`|Depends on configuration.|
-|`ingest/persists/time`|Milliseconds spent doing intermediate
persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.
Generally a few minutes at most.|
-|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate
persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.
Generally a few minutes at most.|
-|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and
blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`,
`tags`|0 or very low|
-|`ingest/persists/failed`|Number of persists that failed.|`dataSource`,
`taskId`, `taskType`, `tags`|0|
-|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`,
`taskId`, `taskType`, `tags`|0|
-|`ingest/merge/time`|Milliseconds spent merging intermediate
segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.
Generally a few minutes at most.|
-|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate
segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.
Generally a few minutes at most.|
-|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`,
`taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every
segment granular period if cluster operating normally.|
-|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`,
`taskType`, `tags`|1~3|
-|`ingest/events/messageGap`|Time gap in milliseconds between the latest
ingested event timestamp and the current system timestamp of metrics emission.
If the value is increasing but lag is low, Druid may not be receiving new data.
This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`,
`tags`|Greater than 0, depends on the time carried in event. |
+|`ingest/events/thrownAway`|Number of events rejected because they are either
null, or filtered by the transform spec, or outside the
windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/unparseable`|Number of events rejected because the events are
unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/duplicate`|Number of events rejected because the events are
duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/processed`|Number of events successfully processed per
emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to
the number of events per emission period.|
+|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`,
`taskType`, `groupId`|Your number of events with rollup.|
+|`ingest/persists/count`|Number of times persist occurred.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.|
+|`ingest/persists/time`|Milliseconds spent doing intermediate
persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on
configuration. Generally a few minutes at most.|
+|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate
persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on
configuration. Generally a few minutes at most.|
+|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and
blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`,
`groupId`, `tags`|0 or very low|
+|`ingest/persists/failed`|Number of persists that failed.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`,
`taskId`, `taskType`, `groupId`,`tags`|0|
+|`ingest/merge/time`|Milliseconds spent merging intermediate
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on
configuration. Generally a few minutes at most.|
+|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on
configuration. Generally a few minutes at most.|
+|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once
every segment granular period if cluster operating normally.|
+|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`,
`taskType`, `groupId`, `tags`|1~3|
+|`ingest/events/messageGap`|Time gap in milliseconds between the latest
ingested event timestamp and the current system timestamp of metrics emission.
If the value is increasing but lag is low, Druid may not be receiving new data.
This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`,
`groupId`, `tags`|Greater than 0, depends on the time carried in event. |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the
coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single
digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
-|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on coordinator
cycle time.|
+|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on
coordinator cycle time.|
Note: If the JVM does not support CPU time measurement for the current thread,
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
@@ -233,20 +233,20 @@ Note: If the JVM does not support CPU time measurement
for the current thread, `
|Metric|Description| Dimensions
|Normal Value|
|------|-----------|------------------------------------------------------------|------------|
-|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`,
`taskType`, `taskStatus`, `tags`|Varies|
-|`task/pending/time`|Milliseconds taken for a task to wait for running.|
`dataSource`, `taskId`, `taskType`, `tags`|Varies|
-|`task/action/log/time`|Milliseconds taken to log a task action to the audit
log.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|< 1000
(subsecond)|
-|`task/action/run/time`|Milliseconds taken to execute a task action.|
`dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies from
subsecond to a few seconds, based on action type.|
-|`task/action/success/count`|Number of task actions that were executed
successfully during the emission period. Currently only being emitted for
[batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
-|`task/action/failed/count`|Number of task actions that failed during the
emission period. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
+|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`,
`taskType`, `groupId`, `taskStatus`, `tags`|Varies|
+|`task/pending/time`|Milliseconds taken for a task to wait for running.|
`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies|
+|`task/action/log/time`|Milliseconds taken to log a task action to the audit
log.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|<
1000 (subsecond)|
+|`task/action/run/time`|Milliseconds taken to execute a task action.|
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies
from subsecond to a few seconds, based on action type.|
+|`task/action/success/count`|Number of task actions that were executed
successfully during the emission period. Currently only being emitted for
[batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
+|`task/action/failed/count`|Number of task actions that failed during the
emission period. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions
in queue. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|Varies based on the
`batchAllocationWaitTime` and number of batches in queue.|
|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task
actions. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few
seconds, based on action type and batch size.|
|`task/action/batch/size`|Number of task actions in a batch that was executed
during the emission period. Currently only being emitted for [batched
`segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent
task actions.|
|`task/action/batch/attempts`|Number of execution attempts for a single batch
of task actions. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|1 if there are no failures or
retries.|
-|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch
indexing task waited for newly created segments to become available for
querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed`,
`tags`|Varies|
-|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`,
`taskId`, `taskType`, `interval`, `tags`|Varies|
-|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move
Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
-|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|
`dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
+|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch
indexing task waited for newly created segments to become available for
querying.| `dataSource`, `taskType`, `groupId`, `taskId`,
`segmentAvailabilityConfirmed`, `tags`|Varies|
+|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`,
`taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies|
+|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move
Task.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies|
+|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|
`dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies|
|`task/success/count`|Number of successful tasks per emission period. This
metric is only available if the TaskCountStatsMonitor module is included.|
`dataSource`|Varies|
|`task/failed/count`|Number of failed tasks per emission period. This metric
is only available if the TaskCountStatsMonitor module is
included.|`dataSource`|Varies|
|`task/running/count`|Number of current running tasks. This metric is only
available if the `TaskCountStatsMonitor` module is
included.|`dataSource`|Varies|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
index a07ad4eaad..abd9516907 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
@@ -56,7 +56,8 @@ public class TaskRealtimeMetricsMonitorBuilder
meters,
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
- DruidMetrics.TASK_TYPE, new String[]{task.getType()}
+ DruidMetrics.TASK_TYPE, new String[]{task.getType()},
+ DruidMetrics.GROUP_ID, new String[]{task.getGroupId()}
),
task.getContextValue(DruidMetrics.TAGS)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index a4cd183c01..11bcfb6de7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -650,6 +650,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
.setDimension("dataSource", getDataSource())
.setDimension("taskType", getType())
.setDimension("taskId", getId())
+ .setDimension("groupId", getGroupId())
.setDimensionIfNotNull(DruidMetrics.TAGS,
getContextValue(DruidMetrics.TAGS))
.setDimension("segmentAvailabilityConfirmed",
segmentAvailabilityConfirmationCompleted)
.build("task/segmentAvailability/wait/time",
segmentAvailabilityWaitTimeMs)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 05be8c1941..cd7a52f772 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -117,6 +117,7 @@ public class IndexTaskUtils
DruidMetrics.TAGS,
task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
);
+ metricBuilder.setDimensionIfNotNull(DruidMetrics.GROUP_ID,
task.getGroupId());
}
public static void setTaskDimensions(final ServiceMetricEvent.Builder
metricBuilder, final AbstractTask task)
@@ -129,6 +130,7 @@ public class IndexTaskUtils
DruidMetrics.TAGS,
task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
);
+ metricBuilder.setDimensionIfNotNull(DruidMetrics.GROUP_ID,
task.getGroupId());
}
public static void setTaskStatusDimensions(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 8df3a69eee..c2f2c0363e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -314,6 +314,10 @@ public class ForkingTaskRunner
MonitorsConfig.METRIC_DIMENSION_PREFIX +
DruidMetrics.TASK_TYPE,
task.getType()
);
+ command.addSystemProperty(
+ MonitorsConfig.METRIC_DIMENSION_PREFIX +
DruidMetrics.GROUP_ID,
+ task.getGroupId()
+ );
command.addSystemProperty("druid.host", childHost);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
index 8543f893fd..025b3ee09e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
@@ -36,6 +36,8 @@ import java.util.Map;
public class IndexTaskUtilsTest
{
private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1",
"v1", "k2", 20);
+
+ private static final String GROUP_ID = "groupId123";
@Mock
private Task task;
@Mock
@@ -47,7 +49,9 @@ public class IndexTaskUtilsTest
{
metricBuilder = ServiceMetricEvent.builder();
Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
+ Mockito.when(task.getGroupId()).thenReturn(GROUP_ID);
Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
+ Mockito.when(abstractTask.getGroupId()).thenReturn(GROUP_ID);
}
@Test
@@ -79,4 +83,34 @@ public class IndexTaskUtilsTest
IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
}
+
+ @Test
+ public void testSetTaskDimensionsWithGroupIdShouldSetGroupId()
+ {
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ Assert.assertEquals(GROUP_ID,
metricBuilder.getDimension(DruidMetrics.GROUP_ID));
+ }
+
+ @Test
+ public void testSetTaskDimensionsWithoutGroupIdShouldNotSetGroupId()
+ {
+ Mockito.when(task.getGroupId()).thenReturn(null);
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ Assert.assertNull(metricBuilder.getDimension(DruidMetrics.GROUP_ID));
+ }
+
+ @Test
+ public void testSetTaskDimensionsForAbstractTaskWithGroupIdShouldSetGroupId()
+ {
+ IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
+ Assert.assertEquals(GROUP_ID,
metricBuilder.getDimension(DruidMetrics.GROUP_ID));
+ }
+
+ @Test
+ public void
testSetTaskDimensionsForAbstractTaskWithoutGroupIdShouldNotSetGroupId()
+ {
+ Mockito.when(abstractTask.getGroupId()).thenReturn(null);
+ IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
+ Assert.assertNull(metricBuilder.getDimension(DruidMetrics.GROUP_ID));
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index 50482ce7f4..5caf90d3fd 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -33,6 +33,7 @@ public class DruidMetrics
public static final String INTERVAL = "interval";
public static final String ID = "id";
public static final String TASK_ID = "taskId";
+ public static final String GROUP_ID = "groupId";
public static final String STATUS = "status";
public static final String TASK_INGESTION_MODE = "taskIngestionMode";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]