This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a2013e6566 Enhance streaming ingestion metrics (#13331)
a2013e6566 is described below
commit a2013e656678146094f6c9039471d3933c81313f
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Nov 9 23:44:15 2022 +0530
Enhance streaming ingestion metrics (#13331)
Changes:
- Add a metric for partition-wise kafka/kinesis lag for streaming ingestion.
- Emit lag metrics for streaming ingestion when supervisor is not suspended
and state is in {RUNNING, IDLE, UNHEALTHY_TASKS, UNHEALTHY_SUPERVISOR}
- Document metrics
---
docs/operations/metrics.md | 15 ++++++----
.../src/main/resources/defaultMetrics.json | 14 +++++++--
.../src/main/resources/defaultMetrics.json | 7 +++--
.../main/resources/defaultMetricDimensions.json | 7 +++--
.../supervisor/SeekableStreamSupervisor.java | 33 ++++++++++++++++++----
.../java/org/apache/druid/query/DruidMetrics.java | 4 +++
6 files changed, 59 insertions(+), 21 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 2c9f4941a8..5dbdc8d5dd 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -183,9 +183,10 @@ These metrics apply to the [Kafka indexing
service](../development/extensions-co
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
-|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|dataSource.|Greater than
0, should not be a very high number |
-|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|dataSource.|Greater than
0, should not be a very high number |
-|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|dataSource.|Greater than
0, should not be a very high number |
+|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|dataSource,
stream.|Greater than 0, should not be a very high number |
+|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|dataSource,
stream.|Greater than 0, should not be a very high number |
+|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|dataSource,
stream.|Greater than 0, should not be a very high number |
+|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum
emission period for this metric is a minute.|dataSource, stream,
partition.|Greater than 0, should not be a very high number |
### Ingestion metrics for Kinesis
@@ -193,9 +194,10 @@ These metrics apply to the [Kinesis indexing
service](../development/extensions-
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
-|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current
message sequence number consumed by the Kinesis indexing tasks and latest
sequence number in Kinesis across all shards. Minimum emission period for this
metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention
period in milliseconds |
-|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current
message sequence number consumed by the Kinesis indexing tasks and latest
sequence number in Kinesis across all shards. Minimum emission period for this
metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention
period in milliseconds |
-|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the
current message sequence number consumed by the Kinesis indexing tasks and
latest sequence number in Kinesis across all shards. Minimum emission period
for this metric is a minute.|dataSource.|Greater than 0, up to max Kinesis
retention period in milliseconds |
+|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current
message sequence number consumed by the Kinesis indexing tasks and latest
sequence number in Kinesis across all shards. Minimum emission period for this
metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis
retention period in milliseconds |
+|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current
message sequence number consumed by the Kinesis indexing tasks and latest
sequence number in Kinesis across all shards. Minimum emission period for this
metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis
retention period in milliseconds |
+|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the
current message sequence number consumed by the Kinesis indexing tasks and
latest sequence number in Kinesis across all shards. Minimum emission period
for this metric is a minute.|dataSource, stream.|Greater than 0, up to max
Kinesis retention period in milliseconds |
+|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds
between the current message sequence number consumed by the Kinesis indexing
tasks and latest sequence number in Kinesis. Minimum emission period for this
metric is a minute.|dataSource, stream, partition.|Greater than 0, up to max
Kinesis retention period in milliseconds |
### Other ingestion metrics
@@ -223,6 +225,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/notices/queueSize`|Number of pending notices to be processed by the
coordinator|dataSource.|Typically 0 and occasionally in lower single digits.
Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor|dataSource| < 1s. |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|dataSource, taskId| < 10 seconds.|
+|`ingest/handoff/time`|Total time taken for each set of segments handed
off.|dataSource, taskId, taskType.|Depends on coordinator cycle time|
Note: If the JVM does not support CPU time measurement for the current thread,
ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
index ae01473a64..cb646f0565 100644
--- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
@@ -102,13 +102,21 @@
"dataSource"
],
"ingest/kafka/lag": [
- "dataSource"
+ "dataSource",
+ "stream"
],
"ingest/kafka/maxLag": [
- "dataSource"
+ "dataSource",
+ "stream"
],
"ingest/kafka/avgLag": [
- "dataSource"
+ "dataSource",
+ "stream"
+ ],
+ "ingest/kafka/partitionLag": [
+ "dataSource",
+ "stream",
+ "partition"
],
"task/run/time": [
"dataSource"
diff --git
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index ad8d675288..88fa434772 100644
---
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -53,9 +53,10 @@
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer",
"conversionFactor": 1000.0, "help": "Seconds spent merging intermediate
segments" },
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer",
"conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on merging
intermediate segments."},
- "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge",
"help": "Total lag between the offsets consumed by the Kafka indexing tasks and
latest offsets in Kafka brokers across all partitions. Minimum emission period
for this metric is a minute."},
- "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge",
"help": "Max lag between the offsets consumed by the Kafka indexing tasks and
latest offsets in Kafka brokers across all partitions. Minimum emission period
for this metric is a minute."},
- "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge",
"help": "Average lag between the offsets consumed by the Kafka indexing tasks
and latest offsets in Kafka brokers across all partitions. Minimum emission
period for this metric is a minute."},
+ "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge", "help": "Total lag between the offsets consumed by the Kafka indexing
tasks and latest offsets in Kafka brokers across all partitions. Minimum
emission period for this metric is a minute."},
+ "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge", "help": "Max lag between the offsets consumed by the Kafka indexing
tasks and latest offsets in Kafka brokers across all partitions. Minimum
emission period for this metric is a minute."},
+ "ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge", "help": "Average lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute."},
+ "ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream",
"partition"], "type" : "gauge", "help": "Partition-wise lag between the offsets
consumed by the Kafka indexing tasks and latest offsets in Kafka brokers.
Minimum emission period for this metric is a minute."},
"task/success/count" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of successful tasks per emission period. This metric is only
available if the TaskCountStatsMonitor module is included."},
"task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of failed tasks per emission period. This metric is only
available if the TaskCountStatsMonitor module is included."},
diff --git
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 3a7767c418..9a7ca1bfcf 100644
---
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -53,9 +53,10 @@
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
- "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
- "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
- "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
+ "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
+ "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
+ "ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
+ "ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream",
"partition"], "type" : "gauge" },
"task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 8a79825a5c..d5e3b62c68 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -86,6 +86,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@@ -4092,9 +4093,14 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected void emitLag()
{
- if (spec.isSuspended() || !(stateManager.isSteadyState() ||
stateManager.isIdle())) {
- // don't emit metrics if supervisor is suspended or not in a healthy
running state
- // (lag should still available in status report)
+ SupervisorStateManager.State basicState =
stateManager.getSupervisorState().getBasicState();
+ boolean unhealthySupervisorOrTasks =
SupervisorStateManager.BasicState.UNHEALTHY_TASKS.equals(basicState)
+ ||
SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR.equals(basicState);
+
+ if (spec.isSuspended() || !(stateManager.isSteadyState() ||
stateManager.isIdle() || unhealthySupervisorOrTasks)) {
+ // Don't emit metrics if the supervisor is suspended. Also,
+ // to emit metrics, the state must be in {healthy steady state, idle or
UNHEALTHY_TASKS or UNHEALTHY_SUPERVISOR}
+ // (lag should still be available in the status report)
return;
}
try {
@@ -4112,19 +4118,34 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
LagStats lagStats = computeLags(partitionLags);
+ for (Map.Entry<PartitionIdType, Long> entry :
partitionLags.entrySet()) {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
+ .setDimension(DruidMetrics.PARTITION,
entry.getKey())
+ .build(
+
StringUtils.format("ingest/%s/partitionLag%s", type, suffix),
+ entry.getValue()
+ )
+ );
+ }
emitter.emit(
ServiceMetricEvent.builder()
- .setDimension("dataSource", dataSource)
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
.build(StringUtils.format("ingest/%s/lag%s",
type, suffix), lagStats.getTotalLag())
);
emitter.emit(
ServiceMetricEvent.builder()
- .setDimension("dataSource", dataSource)
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
.build(StringUtils.format("ingest/%s/maxLag%s",
type, suffix), lagStats.getMaxLag())
);
emitter.emit(
ServiceMetricEvent.builder()
- .setDimension("dataSource", dataSource)
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
.build(StringUtils.format("ingest/%s/avgLag%s",
type, suffix), lagStats.getAvgLag())
);
};
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index 5e6bcb2d5f..b3858258e1 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -48,6 +48,10 @@ public class DruidMetrics
public static final String DUTY = "duty";
public static final String DUTY_GROUP = "dutyGroup";
+ public static final String STREAM = "stream";
+
+ public static final String PARTITION = "partition";
+
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{
int retVal = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]