This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a2013e6566 Enhance streaming ingestion metrics (#13331)
a2013e6566 is described below

commit a2013e656678146094f6c9039471d3933c81313f
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Nov 9 23:44:15 2022 +0530

    Enhance streaming ingestion metrics (#13331)
    
    Changes:
    - Add a metric for partition-wise kafka/kinesis lag for streaming ingestion.
    - Emit lag metrics for streaming ingestion when supervisor is not suspended 
and state is in {RUNNING, IDLE, UNHEALTHY_TASKS, UNHEALTHY_SUPERVISOR}
    - Document metrics
---
 docs/operations/metrics.md                         | 15 ++++++----
 .../src/main/resources/defaultMetrics.json         | 14 +++++++--
 .../src/main/resources/defaultMetrics.json         |  7 +++--
 .../main/resources/defaultMetricDimensions.json    |  7 +++--
 .../supervisor/SeekableStreamSupervisor.java       | 33 ++++++++++++++++++----
 .../java/org/apache/druid/query/DruidMetrics.java  |  4 +++
 6 files changed, 59 insertions(+), 21 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 2c9f4941a8..5dbdc8d5dd 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -183,9 +183,10 @@ These metrics apply to the [Kafka indexing 
service](../development/extensions-co
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|dataSource.|Greater than 
0, should not be a very high number |
-|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|dataSource.|Greater than 
0, should not be a very high number |
-|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|dataSource.|Greater than 
0, should not be a very high number |
+|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|dataSource, 
stream.|Greater than 0, should not be a very high number |
+|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|dataSource, 
stream.|Greater than 0, should not be a very high number |
+|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|dataSource, 
stream.|Greater than 0, should not be a very high number |
+|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed 
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum 
emission period for this metric is a minute.|dataSource, stream, 
partition.|Greater than 0, should not be a very high number |
 
 ### Ingestion metrics for Kinesis
 
@@ -193,9 +194,10 @@ These metrics apply to the [Kinesis indexing 
service](../development/extensions-
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention 
period in milliseconds |
-|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention 
period in milliseconds |
-|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kinesis indexing tasks and 
latest sequence number in Kinesis across all shards. Minimum emission period 
for this metric is a minute.|dataSource.|Greater than 0, up to max Kinesis 
retention period in milliseconds |
+|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis 
retention period in milliseconds |
+|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis 
retention period in milliseconds |
+|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kinesis indexing tasks and 
latest sequence number in Kinesis across all shards. Minimum emission period 
for this metric is a minute.|dataSource, stream.|Greater than 0, up to max 
Kinesis retention period in milliseconds |
+|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds 
between the current message sequence number consumed by the Kinesis indexing 
tasks and latest sequence number in Kinesis. Minimum emission period for this 
metric is a minute.|dataSource, stream, partition.|Greater than 0, up to max 
Kinesis retention period in milliseconds |
 
 ### Other ingestion metrics
 
@@ -223,6 +225,7 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/notices/queueSize`|Number of pending notices to be processed by the 
coordinator|dataSource.|Typically 0 and occasionally in lower single digits. 
Should not be a very high number. |
 |`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor|dataSource| < 1s. |
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|dataSource, taskId| < 10 seconds.|
+|`ingest/handoff/time`|Total time taken for each set of segments handed 
off.|dataSource, taskId, taskType.|Depends on coordinator cycle time|
 
 
 Note: If the JVM does not support CPU time measurement for the current thread, 
ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json 
b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
index ae01473a64..cb646f0565 100644
--- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
@@ -102,13 +102,21 @@
     "dataSource"
   ],
   "ingest/kafka/lag": [
-    "dataSource"
+    "dataSource",
+    "stream"
   ],
   "ingest/kafka/maxLag": [
-    "dataSource"
+    "dataSource",
+    "stream"
   ],
   "ingest/kafka/avgLag": [
-    "dataSource"
+    "dataSource",
+    "stream"
+  ],
+  "ingest/kafka/partitionLag": [
+    "dataSource",
+    "stream",
+    "partition"
   ],
   "task/run/time": [
     "dataSource"
diff --git 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index ad8d675288..88fa434772 100644
--- 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++ 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -53,9 +53,10 @@
   "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer", 
"conversionFactor": 1000.0, "help": "Seconds spent merging intermediate 
segments" },
   "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer", 
"conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on merging 
intermediate segments."},
 
-  "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge", 
"help": "Total lag between the offsets consumed by the Kafka indexing tasks and 
latest offsets in Kafka brokers across all partitions. Minimum emission period 
for this metric is a minute."},
-  "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge", 
"help": "Max lag between the offsets consumed by the Kafka indexing tasks and 
latest offsets in Kafka brokers across all partitions. Minimum emission period 
for this metric is a minute."},
-  "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge", 
"help": "Average lag between the offsets consumed by the Kafka indexing tasks 
and latest offsets in Kafka brokers across all partitions. Minimum emission 
period for this metric is a minute."},
+  "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge", "help": "Total lag between the offsets consumed by the Kafka indexing 
tasks and latest offsets in Kafka brokers across all partitions. Minimum 
emission period for this metric is a minute."},
+  "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge", "help": "Max lag between the offsets consumed by the Kafka indexing 
tasks and latest offsets in Kafka brokers across all partitions. Minimum 
emission period for this metric is a minute."},
+  "ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge", "help": "Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute."},
+  "ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream", 
"partition"], "type" : "gauge", "help": "Partition-wise lag between the offsets 
consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. 
Minimum emission period for this metric is a minute."},
 
   "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count", 
"help": "Number of successful tasks per emission period. This metric is only 
available if the TaskCountStatsMonitor module is included."},
   "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count", 
"help": "Number of failed tasks per emission period. This metric is only 
available if the TaskCountStatsMonitor module is included."},
diff --git 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 3a7767c418..9a7ca1bfcf 100644
--- 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -53,9 +53,10 @@
   "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
   "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
 
-  "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
-  "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
-  "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
+  "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
+  "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
+  "ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
+  "ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream", 
"partition"], "type" : "gauge" },
 
   "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" },
   "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 8a79825a5c..d5e3b62c68 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -86,6 +86,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@@ -4092,9 +4093,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !(stateManager.isSteadyState() || 
stateManager.isIdle())) {
-      // don't emit metrics if supervisor is suspended or not in a healthy 
running state
-      // (lag should still available in status report)
+    SupervisorStateManager.State basicState = 
stateManager.getSupervisorState().getBasicState();
+    boolean unhealthySupervisorOrTasks = 
SupervisorStateManager.BasicState.UNHEALTHY_TASKS.equals(basicState)
+                                         || 
SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR.equals(basicState);
+
+    if (spec.isSuspended() || !(stateManager.isSteadyState() || 
stateManager.isIdle() || unhealthySupervisorOrTasks)) {
+      // Don't emit metrics if the supervisor is suspended. Also,
+      // to emit metrics, the state must be in {healthy steady state, idle or 
UNHEALTHY_TASKS or UNHEALTHY_SUPERVISOR}
+      // (lag should still be available in the status report)
       return;
     }
     try {
@@ -4112,19 +4118,34 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         }
 
         LagStats lagStats = computeLags(partitionLags);
+        for (Map.Entry<PartitionIdType, Long> entry : 
partitionLags.entrySet()) {
+          emitter.emit(
+              ServiceMetricEvent.builder()
+                                .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                                .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                                .setDimension(DruidMetrics.PARTITION, 
entry.getKey())
+                                .build(
+                                    
StringUtils.format("ingest/%s/partitionLag%s", type, suffix),
+                                    entry.getValue()
+                                )
+          );
+        }
         emitter.emit(
             ServiceMetricEvent.builder()
-                              .setDimension("dataSource", dataSource)
+                              .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                              .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
                               .build(StringUtils.format("ingest/%s/lag%s", 
type, suffix), lagStats.getTotalLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
-                              .setDimension("dataSource", dataSource)
+                              .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                              .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
                               .build(StringUtils.format("ingest/%s/maxLag%s", 
type, suffix), lagStats.getMaxLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
-                              .setDimension("dataSource", dataSource)
+                              .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                              .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
                               .build(StringUtils.format("ingest/%s/avgLag%s", 
type, suffix), lagStats.getAvgLag())
         );
       };
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index 5e6bcb2d5f..b3858258e1 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -48,6 +48,10 @@ public class DruidMetrics
   public static final String DUTY = "duty";
   public static final String DUTY_GROUP = "dutyGroup";
 
+  public static final String STREAM = "stream";
+
+  public static final String PARTITION = "partition";
+
   public static int findNumComplexAggs(List<AggregatorFactory> aggs)
   {
     int retVal = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to